diff --git a/bridge_test.go b/bridge_test.go index 7aa3abe..f45cefa 100644 --- a/bridge_test.go +++ b/bridge_test.go @@ -14,12 +14,20 @@ package main import ( + "fmt" + "net" "reflect" "testing" + "time" "github.com/go-kit/kit/log" + "github.com/prometheus/client_golang/prometheus" + dto "github.com/prometheus/client_model/go" + "github.com/prometheus/statsd_exporter/pkg/clock" "github.com/prometheus/statsd_exporter/pkg/event" + "github.com/prometheus/statsd_exporter/pkg/exporter" "github.com/prometheus/statsd_exporter/pkg/listener" + "github.com/prometheus/statsd_exporter/pkg/mapper" ) func TestHandlePacket(t *testing.T) { @@ -417,11 +425,35 @@ func TestHandlePacket(t *testing.T) { }, } - for k, l := range []statsDPacketHandler{&listener.StatsDUDPListener{nil, nil, log.NewNopLogger()}, &mockStatsDTCPListener{listener.StatsDTCPListener{nil, nil, log.NewNopLogger()}, log.NewNopLogger()}} { + for k, l := range []statsDPacketHandler{&listener.StatsDUDPListener{ + Conn: nil, + EventHandler: nil, + Logger: log.NewNopLogger(), + UDPPackets: udpPackets, + LinesReceived: linesReceived, + EventsFlushed: eventsFlushed, + SampleErrors: *sampleErrors, + SamplesReceived: samplesReceived, + TagErrors: tagErrors, + TagsReceived: tagsReceived, + }, &mockStatsDTCPListener{listener.StatsDTCPListener{ + Conn: nil, + EventHandler: nil, + Logger: log.NewNopLogger(), + LinesReceived: linesReceived, + EventsFlushed: eventsFlushed, + SampleErrors: *sampleErrors, + SamplesReceived: samplesReceived, + TagErrors: tagErrors, + TagsReceived: tagsReceived, + TCPConnections: tcpConnections, + TCPErrors: tcpErrors, + TCPLineTooLong: tcpLineTooLong, + }, log.NewNopLogger()}} { events := make(chan event.Events, 32) l.SetEventHandler(&event.UnbufferedEventHandler{C: events}) for i, scenario := range scenarios { - l.HandlePacket([]byte(scenario.in), udpPackets, linesReceived, eventsFlushed, *sampleErrors, samplesReceived, tagErrors, tagsReceived) + l.HandlePacket([]byte(scenario.in)) le := len(events) // Flatten actual events. @@ -442,3 +474,225 @@ func TestHandlePacket(t *testing.T) { } } } + +type statsDPacketHandler interface { + HandlePacket(packet []byte) + SetEventHandler(eh event.EventHandler) +} + +type mockStatsDTCPListener struct { + listener.StatsDTCPListener + log.Logger +} + +func (ml *mockStatsDTCPListener) HandlePacket(packet []byte) { + // Forcing IPv4 because the TravisCI build environment does not have IPv6 + // addresses. + lc, err := net.ListenTCP("tcp4", nil) + if err != nil { + panic(fmt.Sprintf("mockStatsDTCPListener: listen failed: %v", err)) + } + + defer lc.Close() + + go func() { + cc, err := net.DialTCP("tcp", nil, lc.Addr().(*net.TCPAddr)) + if err != nil { + panic(fmt.Sprintf("mockStatsDTCPListener: dial failed: %v", err)) + } + + defer cc.Close() + + n, err := cc.Write(packet) + if err != nil || n != len(packet) { + panic(fmt.Sprintf("mockStatsDTCPListener: write failed: %v,%d", err, n)) + } + }() + + sc, err := lc.AcceptTCP() + if err != nil { + panic(fmt.Sprintf("mockStatsDTCPListener: accept failed: %v", err)) + } + ml.HandleConn(sc) +} + +// TestTtlExpiration validates expiration of time series. +// foobar metric without mapping should expire with default ttl of 1s +// bazqux metric should expire with ttl of 2s +func TestTtlExpiration(t *testing.T) { + // Mock a time.NewTicker + tickerCh := make(chan time.Time) + clock.ClockInstance = &clock.Clock{ + TickerCh: tickerCh, + } + + config := ` +defaults: + ttl: 1s +mappings: +- match: bazqux.* + name: bazqux + ttl: 2s +` + // Create mapper from config and start an Exporter with a synchronous channel + testMapper := &mapper.MetricMapper{} + err := testMapper.InitFromYAMLString(config, 0) + if err != nil { + t.Fatalf("Config load error: %s %s", config, err) + } + events := make(chan event.Events) + defer close(events) + go func() { + ex := exporter.NewExporter(testMapper, log.NewNopLogger(), eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount) + ex.Listen(events) + }() + + ev := event.Events{ + // event with default ttl = 1s + &event.GaugeEvent{ + GMetricName: "foobar", + GValue: 200, + }, + // event with ttl = 2s from a mapping + &event.TimerEvent{ + TMetricName: "bazqux.main", + TValue: 42000, + }, + } + + var metrics []*dto.MetricFamily + var foobarValue *float64 + var bazquxValue *float64 + + // Step 1. Send events with statsd metrics. + // Send empty Events to wait for events are handled. + // saveLabelValues will use fake instant as a lastRegisteredAt time. + clock.ClockInstance.Instant = time.Unix(0, 0) + events <- ev + events <- event.Events{} + + // Check values + metrics, err = prometheus.DefaultGatherer.Gather() + if err != nil { + t.Fatal("Gather should not fail") + } + foobarValue = getFloat64(metrics, "foobar", prometheus.Labels{}) + bazquxValue = getFloat64(metrics, "bazqux", prometheus.Labels{}) + if foobarValue == nil || bazquxValue == nil { + t.Fatalf("Gauge `foobar` and Summary `bazqux` should be gathered") + } + if *foobarValue != 200 { + t.Fatalf("Gauge `foobar` observation %f is not expected. Should be 200", *foobarValue) + } + if *bazquxValue != 42 { + t.Fatalf("Summary `bazqux` observation %f is not expected. Should be 42", *bazquxValue) + } + + // Step 2. Increase Instant to emulate metrics expiration after 1s + clock.ClockInstance.Instant = time.Unix(1, 10) + clock.ClockInstance.TickerCh <- time.Unix(0, 0) + events <- event.Events{} + + // Check values + metrics, err = prometheus.DefaultGatherer.Gather() + if err != nil { + t.Fatal("Gather should not fail") + } + foobarValue = getFloat64(metrics, "foobar", prometheus.Labels{}) + bazquxValue = getFloat64(metrics, "bazqux", prometheus.Labels{}) + if foobarValue != nil { + t.Fatalf("Gauge `foobar` should be expired") + } + if bazquxValue == nil { + t.Fatalf("Summary `bazqux` should be gathered") + } + if *bazquxValue != 42 { + t.Fatalf("Summary `bazqux` observation %f is not expected. Should be 42", *bazquxValue) + } + + // Step 3. Increase Instant to emulate metrics expiration after 2s + clock.ClockInstance.Instant = time.Unix(2, 200) + clock.ClockInstance.TickerCh <- time.Unix(0, 0) + events <- event.Events{} + + // Check values + metrics, err = prometheus.DefaultGatherer.Gather() + if err != nil { + t.Fatal("Gather should not fail") + } + foobarValue = getFloat64(metrics, "foobar", prometheus.Labels{}) + bazquxValue = getFloat64(metrics, "bazqux", prometheus.Labels{}) + if bazquxValue != nil { + t.Fatalf("Summary `bazqux` should be expired") + } + if foobarValue != nil { + t.Fatalf("Gauge `foobar` should not be gathered after expiration") + } +} + +// getFloat64 search for metric by name in array of MetricFamily and then search a value by labels. +// Method returns a value or nil if metric is not found. +func getFloat64(metrics []*dto.MetricFamily, name string, labels prometheus.Labels) *float64 { + var metricFamily *dto.MetricFamily + for _, m := range metrics { + if *m.Name == name { + metricFamily = m + break + } + } + if metricFamily == nil { + return nil + } + + var metric *dto.Metric + labelStr := fmt.Sprintf("%v", labels) + for _, m := range metricFamily.Metric { + l := labelPairsAsLabels(m.GetLabel()) + ls := fmt.Sprintf("%v", l) + if labelStr == ls { + metric = m + break + } + } + if metric == nil { + return nil + } + + var value float64 + if metric.Gauge != nil { + value = metric.Gauge.GetValue() + return &value + } + if metric.Counter != nil { + value = metric.Counter.GetValue() + return &value + } + if metric.Histogram != nil { + value = metric.Histogram.GetSampleSum() + return &value + } + if metric.Summary != nil { + value = metric.Summary.GetSampleSum() + return &value + } + if metric.Untyped != nil { + value = metric.Untyped.GetValue() + return &value + } + panic(fmt.Errorf("collected a non-gauge/counter/histogram/summary/untyped metric: %s", metric)) +} + +func labelPairsAsLabels(pairs []*dto.LabelPair) (labels prometheus.Labels) { + labels = prometheus.Labels{} + for _, pair := range pairs { + if pair.Name == nil { + continue + } + value := "" + if pair.Value != nil { + value = *pair.Value + } + labels[*pair.Name] = value + } + return +} diff --git a/exporter_benchmark_test.go b/exporter_benchmark_test.go index 675d52c..900941b 100644 --- a/exporter_benchmark_test.go +++ b/exporter_benchmark_test.go @@ -50,7 +50,7 @@ func benchmarkUDPListener(times int, b *testing.B) { for i := 0; i < times; i++ { for _, line := range bytesInput { - l.HandlePacket([]byte(line), udpPackets, linesReceived, eventsFlushed, *sampleErrors, samplesReceived, tagErrors, tagsReceived) + l.HandlePacket([]byte(line)) } } } @@ -142,7 +142,7 @@ mappings: b.Fatalf("Config load error: %s %s", config, err) } - ex := exporter.NewExporter(testMapper, log.NewNopLogger()) + ex := exporter.NewExporter(testMapper, log.NewNopLogger(), eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount) for i := 0; i < b.N; i++ { ec := make(chan event.Events, 1000) go func() { @@ -152,6 +152,6 @@ mappings: close(ec) }() - ex.Listen(ec, eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount) + ex.Listen(ec) } } diff --git a/main.go b/main.go index e1113d1..d85d04a 100644 --- a/main.go +++ b/main.go @@ -32,11 +32,11 @@ import ( "github.com/prometheus/common/version" "gopkg.in/alecthomas/kingpin.v2" + "github.com/prometheus/statsd_exporter/pkg/address" "github.com/prometheus/statsd_exporter/pkg/event" "github.com/prometheus/statsd_exporter/pkg/exporter" "github.com/prometheus/statsd_exporter/pkg/listener" "github.com/prometheus/statsd_exporter/pkg/mapper" - "github.com/prometheus/statsd_exporter/pkg/util" ) const ( @@ -58,10 +58,11 @@ var ( Help: "Number of times events were flushed to exporter", }, ) - eventsUnmapped = prometheus.NewCounter(prometheus.CounterOpts{ - Name: "statsd_exporter_events_unmapped_total", - Help: "The total number of StatsD events no mapping was found for.", - }) + eventsUnmapped = prometheus.NewCounter( + prometheus.CounterOpts{ + Name: "statsd_exporter_events_unmapped_total", + Help: "The total number of StatsD events no mapping was found for.", + }) udpPackets = prometheus.NewCounter( prometheus.CounterOpts{ Name: "statsd_exporter_udp_packets_total", @@ -294,7 +295,7 @@ func main() { eventQueue := event.NewEventQueue(events, *eventFlushThreshold, *eventFlushInterval, eventsFlushed) if *statsdListenUDP != "" { - udpListenAddr, err := util.UDPAddrFromString(*statsdListenUDP) + udpListenAddr, err := address.UDPAddrFromString(*statsdListenUDP) if err != nil { level.Error(logger).Log("msg", "invalid UDP listen address", "address", *statsdListenUDP, "error", err) os.Exit(1) @@ -313,12 +314,24 @@ func main() { } } - ul := &listener.StatsDUDPListener{Conn: uconn, EventHandler: eventQueue, Logger: logger} - go ul.Listen(udpPackets, linesReceived, eventsFlushed, *sampleErrors, samplesReceived, tagErrors, tagsReceived) + ul := &listener.StatsDUDPListener{ + Conn: uconn, + EventHandler: eventQueue, + Logger: logger, + UDPPackets: udpPackets, + LinesReceived: linesReceived, + EventsFlushed: eventsFlushed, + SampleErrors: *sampleErrors, + SamplesReceived: samplesReceived, + TagErrors: tagErrors, + TagsReceived: tagsReceived, + } + + go ul.Listen() } if *statsdListenTCP != "" { - tcpListenAddr, err := util.TCPAddrFromString(*statsdListenTCP) + tcpListenAddr, err := address.TCPAddrFromString(*statsdListenTCP) if err != nil { level.Error(logger).Log("msg", "invalid TCP listen address", "address", *statsdListenUDP, "error", err) os.Exit(1) @@ -330,8 +343,22 @@ func main() { } defer tconn.Close() - tl := &listener.StatsDTCPListener{Conn: tconn, EventHandler: eventQueue, Logger: logger} - go tl.Listen(linesReceived, eventsFlushed, tcpConnections, tcpErrors, tcpLineTooLong, *sampleErrors, samplesReceived, tagErrors, tagsReceived) + tl := &listener.StatsDTCPListener{ + Conn: tconn, + EventHandler: eventQueue, + Logger: logger, + LinesReceived: linesReceived, + EventsFlushed: eventsFlushed, + SampleErrors: *sampleErrors, + SamplesReceived: samplesReceived, + TagErrors: tagErrors, + TagsReceived: tagsReceived, + TCPConnections: tcpConnections, + TCPErrors: tcpErrors, + TCPLineTooLong: tcpLineTooLong, + } + + go tl.Listen() } if *statsdListenUnixgram != "" { @@ -360,7 +387,7 @@ func main() { } ul := &listener.StatsDUnixgramListener{Conn: uxgconn, EventHandler: eventQueue, Logger: logger} - go ul.Listen(unixgramPackets, linesReceived, eventsFlushed, *sampleErrors, samplesReceived, tagErrors, tagsReceived) + go ul.Listen() // if it's an abstract unix domain socket, it won't exist on fs // so we can't chmod it either @@ -403,12 +430,12 @@ func main() { go configReloader(*mappingConfig, mapper, *cacheSize, logger, cacheOption) - exporter := exporter.NewExporter(mapper, logger) + exporter := exporter.NewExporter(mapper, logger, eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount) signals := make(chan os.Signal, 1) signal.Notify(signals, os.Interrupt, syscall.SIGTERM) - go exporter.Listen(events, eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount) + go exporter.Listen(events) <-signals } diff --git a/pkg/util/util.go b/pkg/address/address.go similarity index 99% rename from pkg/util/util.go rename to pkg/address/address.go index 19dffdc..75f4736 100644 --- a/pkg/util/util.go +++ b/pkg/address/address.go @@ -11,7 +11,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package util +package address import ( "fmt" diff --git a/pkg/event/event.go b/pkg/event/event.go index 6e3d97c..bc22629 100644 --- a/pkg/event/event.go +++ b/pkg/event/event.go @@ -69,12 +69,14 @@ type EventQueue struct { C chan Events q Events m sync.Mutex - flushThreshold int flushTicker *time.Ticker + flushThreshold int + flushInterval time.Duration + eventsFlushed prometheus.Counter } type EventHandler interface { - Queue(event Events, eventsFlushed *prometheus.Counter) + Queue(event Events) } func NewEventQueue(c chan Events, flushThreshold int, flushInterval time.Duration, eventsFlushed prometheus.Counter) *EventQueue { @@ -84,38 +86,39 @@ func NewEventQueue(c chan Events, flushThreshold int, flushInterval time.Duratio flushThreshold: flushThreshold, flushTicker: ticker, q: make([]Event, 0, flushThreshold), + eventsFlushed: eventsFlushed, } go func() { for { <-ticker.C - eq.Flush(eventsFlushed) + eq.Flush() } }() return eq } -func (eq *EventQueue) Queue(events Events, eventsFlushed *prometheus.Counter) { +func (eq *EventQueue) Queue(events Events) { eq.m.Lock() defer eq.m.Unlock() for _, e := range events { eq.q = append(eq.q, e) if len(eq.q) >= eq.flushThreshold { - eq.FlushUnlocked(*eventsFlushed) + eq.FlushUnlocked() } } } -func (eq *EventQueue) Flush(eventsFlushed prometheus.Counter) { +func (eq *EventQueue) Flush() { eq.m.Lock() defer eq.m.Unlock() - eq.FlushUnlocked(eventsFlushed) + eq.FlushUnlocked() } -func (eq *EventQueue) FlushUnlocked(eventsFlushed prometheus.Counter) { +func (eq *EventQueue) FlushUnlocked() { eq.C <- eq.q eq.q = make([]Event, 0, cap(eq.q)) - eventsFlushed.Inc() + eq.eventsFlushed.Inc() } func (eq *EventQueue) Len() int { @@ -129,6 +132,6 @@ type UnbufferedEventHandler struct { C chan Events } -func (ueh *UnbufferedEventHandler) Queue(events Events, eventsFlushed *prometheus.Counter) { +func (ueh *UnbufferedEventHandler) Queue(events Events) { ueh.C <- events } diff --git a/event_test.go b/pkg/event/event_test.go similarity index 79% rename from event_test.go rename to pkg/event/event_test.go index 425edce..d458ca5 100644 --- a/event_test.go +++ b/pkg/event/event_test.go @@ -11,23 +11,30 @@ // See the License for the specific language governing permissions and // limitations under the License. -package main +package event import ( "testing" "time" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/statsd_exporter/pkg/clock" - "github.com/prometheus/statsd_exporter/pkg/event" +) + +var eventsFlushed = prometheus.NewCounter( + prometheus.CounterOpts{ + Name: "statsd_exporter_event_queue_flushed_total", + Help: "Number of times events were flushed to exporter", + }, ) func TestEventThresholdFlush(t *testing.T) { - c := make(chan event.Events, 100) + c := make(chan Events, 100) // We're not going to flush during this test, so the duration doesn't matter. - eq := event.NewEventQueue(c, 5, time.Second, eventsFlushed) - e := make(event.Events, 13) + eq := NewEventQueue(c, 5, time.Second, eventsFlushed) + e := make(Events, 13) go func() { - eq.Queue(e, &eventsFlushed) + eq.Queue(e) }() batch := <-c @@ -52,10 +59,10 @@ func TestEventIntervalFlush(t *testing.T) { } clock.ClockInstance.Instant = time.Unix(0, 0) - c := make(chan event.Events, 100) - eq := event.NewEventQueue(c, 1000, time.Second*1000, eventsFlushed) - e := make(event.Events, 10) - eq.Queue(e, &eventsFlushed) + c := make(chan Events, 100) + eq := NewEventQueue(c, 1000, time.Second*1000, eventsFlushed) + e := make(Events, 10) + eq.Queue(e) if eq.Len() != 10 { t.Fatal("Expected 10 events to be queued, but got", eq.Len()) diff --git a/pkg/exporter/exporter.go b/pkg/exporter/exporter.go index cfa5aae..66876cc 100644 --- a/pkg/exporter/exporter.go +++ b/pkg/exporter/exporter.go @@ -32,15 +32,20 @@ const ( ) type Exporter struct { - Mapper *mapper.MetricMapper - Registry *registry.Registry - Logger log.Logger + Mapper *mapper.MetricMapper + Registry *registry.Registry + Logger log.Logger + EventsActions *prometheus.CounterVec + EventsUnmapped prometheus.Counter + ErrorEventStats *prometheus.CounterVec + EventStats *prometheus.CounterVec + ConflictingEventStats *prometheus.CounterVec + MetricsCount *prometheus.GaugeVec } // Listen handles all events sent to the given channel sequentially. It // terminates when the channel is closed. -func (b *Exporter) Listen(e <-chan event.Events, eventsActions *prometheus.CounterVec, eventsUnmapped prometheus.Counter, - errorEventStats *prometheus.CounterVec, eventStats *prometheus.CounterVec, conflictingEventStats *prometheus.CounterVec, metricsCount *prometheus.GaugeVec) { +func (b *Exporter) Listen(e <-chan event.Events) { removeStaleMetricsTicker := clock.NewTicker(time.Second) @@ -55,15 +60,14 @@ func (b *Exporter) Listen(e <-chan event.Events, eventsActions *prometheus.Count return } for _, event := range events { - b.handleEvent(event, eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount) + b.handleEvent(event) } } } } // handleEvent processes a single Event according to the configured mapping. -func (b *Exporter) handleEvent(thisEvent event.Event, eventsActions *prometheus.CounterVec, eventsUnmapped prometheus.Counter, - errorEventStats *prometheus.CounterVec, eventStats *prometheus.CounterVec, conflictingEventStats *prometheus.CounterVec, metricsCount *prometheus.GaugeVec) { +func (b *Exporter) handleEvent(thisEvent event.Event) { mapping, labels, present := b.Mapper.GetMapping(thisEvent.MetricName(), thisEvent.MetricType()) if mapping == nil { @@ -74,7 +78,7 @@ func (b *Exporter) handleEvent(thisEvent event.Event, eventsActions *prometheus. } if mapping.Action == mapper.ActionTypeDrop { - eventsActions.WithLabelValues("drop").Inc() + b.EventsActions.WithLabelValues("drop").Inc() return } @@ -89,16 +93,16 @@ func (b *Exporter) handleEvent(thisEvent event.Event, eventsActions *prometheus. if present { if mapping.Name == "" { level.Debug(b.Logger).Log("msg", "The mapping generates an empty metric name", "metric_name", thisEvent.MetricName(), "match", mapping.Match) - errorEventStats.WithLabelValues("empty_metric_name").Inc() + b.ErrorEventStats.WithLabelValues("empty_metric_name").Inc() return } metricName = mapper.EscapeMetricName(mapping.Name) for label, value := range labels { prometheusLabels[label] = value } - eventsActions.WithLabelValues(string(mapping.Action)).Inc() + b.EventsActions.WithLabelValues(string(mapping.Action)).Inc() } else { - eventsUnmapped.Inc() + b.EventsUnmapped.Inc() metricName = mapper.EscapeMetricName(thisEvent.MetricName()) } @@ -108,21 +112,21 @@ func (b *Exporter) handleEvent(thisEvent event.Event, eventsActions *prometheus. // will cause the exporter to panic. Instead we will warn and continue to the next event. if thisEvent.Value() < 0.0 { level.Debug(b.Logger).Log("msg", "counter must be non-negative value", "metric", metricName, "event_value", thisEvent.Value()) - errorEventStats.WithLabelValues("illegal_negative_counter").Inc() + b.ErrorEventStats.WithLabelValues("illegal_negative_counter").Inc() return } - counter, err := b.Registry.GetCounter(metricName, prometheusLabels, help, mapping, metricsCount) + counter, err := b.Registry.GetCounter(metricName, prometheusLabels, help, mapping, b.MetricsCount) if err == nil { counter.Add(thisEvent.Value()) - eventStats.WithLabelValues("counter").Inc() + b.EventStats.WithLabelValues("counter").Inc() } else { level.Debug(b.Logger).Log("msg", regErrF, "metric", metricName, "error", err) - conflictingEventStats.WithLabelValues("counter").Inc() + b.ConflictingEventStats.WithLabelValues("counter").Inc() } case *event.GaugeEvent: - gauge, err := b.Registry.GetGauge(metricName, prometheusLabels, help, mapping, metricsCount) + gauge, err := b.Registry.GetGauge(metricName, prometheusLabels, help, mapping, b.MetricsCount) if err == nil { if ev.GRelative { @@ -130,10 +134,10 @@ func (b *Exporter) handleEvent(thisEvent event.Event, eventsActions *prometheus. } else { gauge.Set(thisEvent.Value()) } - eventStats.WithLabelValues("gauge").Inc() + b.EventStats.WithLabelValues("gauge").Inc() } else { level.Debug(b.Logger).Log("msg", regErrF, "metric", metricName, "error", err) - conflictingEventStats.WithLabelValues("gauge").Inc() + b.ConflictingEventStats.WithLabelValues("gauge").Inc() } case *event.TimerEvent: @@ -147,23 +151,23 @@ func (b *Exporter) handleEvent(thisEvent event.Event, eventsActions *prometheus. switch t { case mapper.TimerTypeHistogram: - histogram, err := b.Registry.GetHistogram(metricName, prometheusLabels, help, mapping, metricsCount) + histogram, err := b.Registry.GetHistogram(metricName, prometheusLabels, help, mapping, b.MetricsCount) if err == nil { histogram.Observe(thisEvent.Value() / 1000) // prometheus presumes seconds, statsd millisecond - eventStats.WithLabelValues("timer").Inc() + b.EventStats.WithLabelValues("timer").Inc() } else { level.Debug(b.Logger).Log("msg", regErrF, "metric", metricName, "error", err) - conflictingEventStats.WithLabelValues("timer").Inc() + b.ConflictingEventStats.WithLabelValues("timer").Inc() } case mapper.TimerTypeDefault, mapper.TimerTypeSummary: - summary, err := b.Registry.GetSummary(metricName, prometheusLabels, help, mapping, metricsCount) + summary, err := b.Registry.GetSummary(metricName, prometheusLabels, help, mapping, b.MetricsCount) if err == nil { summary.Observe(thisEvent.Value() / 1000) // prometheus presumes seconds, statsd millisecond - eventStats.WithLabelValues("timer").Inc() + b.EventStats.WithLabelValues("timer").Inc() } else { level.Debug(b.Logger).Log("msg", regErrF, "metric", metricName, "error", err) - conflictingEventStats.WithLabelValues("timer").Inc() + b.ConflictingEventStats.WithLabelValues("timer").Inc() } default: @@ -173,14 +177,20 @@ func (b *Exporter) handleEvent(thisEvent event.Event, eventsActions *prometheus. default: level.Debug(b.Logger).Log("msg", "Unsupported event type") - eventStats.WithLabelValues("illegal").Inc() + b.EventStats.WithLabelValues("illegal").Inc() } } -func NewExporter(mapper *mapper.MetricMapper, logger log.Logger) *Exporter { +func NewExporter(mapper *mapper.MetricMapper, logger log.Logger, eventsActions *prometheus.CounterVec, eventsUnmapped prometheus.Counter, errorEventStats *prometheus.CounterVec, eventStats *prometheus.CounterVec, conflictingEventStats *prometheus.CounterVec, metricsCount *prometheus.GaugeVec) *Exporter { return &Exporter{ - Mapper: mapper, - Registry: registry.NewRegistry(mapper), - Logger: logger, + Mapper: mapper, + Registry: registry.NewRegistry(mapper), + Logger: logger, + EventsActions: eventsActions, + EventsUnmapped: eventsUnmapped, + ErrorEventStats: errorEventStats, + EventStats: eventStats, + ConflictingEventStats: conflictingEventStats, + MetricsCount: metricsCount, } } diff --git a/exporter_test.go b/pkg/exporter/exporter_test.go similarity index 78% rename from exporter_test.go rename to pkg/exporter/exporter_test.go index e124aac..86cb040 100644 --- a/exporter_test.go +++ b/pkg/exporter/exporter_test.go @@ -11,7 +11,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package main +package exporter import ( "fmt" @@ -25,13 +25,133 @@ import ( "github.com/prometheus/statsd_exporter/pkg/clock" "github.com/prometheus/statsd_exporter/pkg/event" - "github.com/prometheus/statsd_exporter/pkg/exporter" "github.com/prometheus/statsd_exporter/pkg/line" "github.com/prometheus/statsd_exporter/pkg/listener" "github.com/prometheus/statsd_exporter/pkg/mapper" "github.com/prometheus/statsd_exporter/pkg/registry" ) +var ( + eventStats = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "statsd_exporter_events_total", + Help: "The total number of StatsD events seen.", + }, + []string{"type"}, + ) + eventsFlushed = prometheus.NewCounter( + prometheus.CounterOpts{ + Name: "statsd_exporter_event_queue_flushed_total", + Help: "Number of times events were flushed to exporter", + }, + ) + eventsUnmapped = prometheus.NewCounter( + prometheus.CounterOpts{ + Name: "statsd_exporter_events_unmapped_total", + Help: "The total number of StatsD events no mapping was found for.", + }) + udpPackets = prometheus.NewCounter( + prometheus.CounterOpts{ + Name: "statsd_exporter_udp_packets_total", + Help: "The total number of StatsD packets received over UDP.", + }, + ) + tcpConnections = prometheus.NewCounter( + prometheus.CounterOpts{ + Name: "statsd_exporter_tcp_connections_total", + Help: "The total number of TCP connections handled.", + }, + ) + tcpErrors = prometheus.NewCounter( + prometheus.CounterOpts{ + Name: "statsd_exporter_tcp_connection_errors_total", + Help: "The number of errors encountered reading from TCP.", + }, + ) + tcpLineTooLong = prometheus.NewCounter( + prometheus.CounterOpts{ + Name: "statsd_exporter_tcp_too_long_lines_total", + Help: "The number of lines discarded due to being too long.", + }, + ) + unixgramPackets = prometheus.NewCounter( + prometheus.CounterOpts{ + Name: "statsd_exporter_unixgram_packets_total", + Help: "The total number of StatsD packets received over Unixgram.", + }, + ) + linesReceived = prometheus.NewCounter( + prometheus.CounterOpts{ + Name: "statsd_exporter_lines_total", + Help: "The total number of StatsD lines received.", + }, + ) + samplesReceived = prometheus.NewCounter( + prometheus.CounterOpts{ + Name: "statsd_exporter_samples_total", + Help: "The total number of StatsD samples received.", + }, + ) + sampleErrors = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "statsd_exporter_sample_errors_total", + Help: "The total number of errors parsing StatsD samples.", + }, + []string{"reason"}, + ) + tagsReceived = prometheus.NewCounter( + prometheus.CounterOpts{ + Name: "statsd_exporter_tags_total", + Help: "The total number of DogStatsD tags processed.", + }, + ) + tagErrors = prometheus.NewCounter( + prometheus.CounterOpts{ + Name: "statsd_exporter_tag_errors_total", + Help: "The number of errors parsing DogStatsD tags.", + }, + ) + configLoads = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "statsd_exporter_config_reloads_total", + Help: "The number of configuration reloads.", + }, + []string{"outcome"}, + ) + mappingsCount = prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "statsd_exporter_loaded_mappings", + Help: "The current number of configured metric mappings.", + }) + conflictingEventStats = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "statsd_exporter_events_conflict_total", + Help: "The total number of StatsD events with conflicting names.", + }, + []string{"type"}, + ) + errorEventStats = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "statsd_exporter_events_error_total", + Help: "The total number of StatsD events discarded due to errors.", + }, + []string{"reason"}, + ) + eventsActions = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "statsd_exporter_events_actions_total", + Help: "The total number of StatsD events by action.", + }, + []string{"action"}, + ) + metricsCount = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "statsd_exporter_metrics_total", + Help: "The total number of metrics.", + }, + []string{"type"}, + ) +) + // TestNegativeCounter validates when we send a negative // number to a counter that we no longer panic the Exporter Listener. func TestNegativeCounter(t *testing.T) { @@ -64,8 +184,8 @@ func TestNegativeCounter(t *testing.T) { testMapper := mapper.MetricMapper{} testMapper.InitCache(0) - ex := exporter.NewExporter(&testMapper, log.NewNopLogger()) - ex.Listen(events, eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount) + ex := NewExporter(&testMapper, log.NewNopLogger(), eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount) + ex.Listen(events) updated := getTelemetryCounterValue(errorCounter) if updated-prev != 1 { @@ -145,8 +265,8 @@ mappings: t.Fatalf("Config load error: %s %s", config, err) } - ex := exporter.NewExporter(testMapper, log.NewNopLogger()) - ex.Listen(events, eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount) + ex := NewExporter(testMapper, log.NewNopLogger(), eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount) + ex.Listen(events) metrics, err := prometheus.DefaultGatherer.Gather() if err != nil { @@ -203,8 +323,8 @@ mappings: t.Fatalf("Config load error: %s %s", config, err) } - ex := exporter.NewExporter(testMapper, log.NewNopLogger()) - ex.Listen(events, eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount) + ex := NewExporter(testMapper, log.NewNopLogger(), eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount) + ex.Listen(events) metrics, err := prometheus.DefaultGatherer.Gather() if err != nil { @@ -418,8 +538,8 @@ mappings: events <- s.in close(events) }() - ex := exporter.NewExporter(testMapper, log.NewNopLogger()) - ex.Listen(events, eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount) + ex := NewExporter(testMapper, log.NewNopLogger(), eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount) + ex.Listen(events) metrics, err := prometheus.DefaultGatherer.Gather() if err != nil { @@ -473,8 +593,8 @@ mappings: errorCounter := errorEventStats.WithLabelValues("empty_metric_name") prev := getTelemetryCounterValue(errorCounter) - ex := exporter.NewExporter(testMapper, log.NewNopLogger()) - ex.Listen(events, eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount) + ex := NewExporter(testMapper, log.NewNopLogger(), eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount) + ex.Listen(events) updated := getTelemetryCounterValue(errorCounter) if updated-prev != 1 { @@ -498,9 +618,33 @@ func TestInvalidUtf8InDatadogTagValue(t *testing.T) { ueh := &event.UnbufferedEventHandler{C: events} go func() { - for _, l := range []statsDPacketHandler{&listener.StatsDUDPListener{nil, nil, log.NewNopLogger()}, &mockStatsDTCPListener{listener.StatsDTCPListener{nil, nil, log.NewNopLogger()}, log.NewNopLogger()}} { + for _, l := range []statsDPacketHandler{&listener.StatsDUDPListener{ + Conn: nil, + EventHandler: nil, + Logger: log.NewNopLogger(), + UDPPackets: udpPackets, + LinesReceived: linesReceived, + EventsFlushed: eventsFlushed, + SampleErrors: *sampleErrors, + SamplesReceived: samplesReceived, + TagErrors: tagErrors, + TagsReceived: tagsReceived, + }, &mockStatsDTCPListener{listener.StatsDTCPListener{ + Conn: nil, + EventHandler: nil, + Logger: log.NewNopLogger(), + LinesReceived: linesReceived, + EventsFlushed: eventsFlushed, + SampleErrors: *sampleErrors, + SamplesReceived: samplesReceived, + TagErrors: tagErrors, + TagsReceived: tagsReceived, + TCPConnections: tcpConnections, + TCPErrors: tcpErrors, + TCPLineTooLong: tcpLineTooLong, + }, log.NewNopLogger()}} { l.SetEventHandler(ueh) - l.HandlePacket([]byte("bar:200|c|#tag:value\nbar:200|c|#tag:\xc3\x28invalid"), udpPackets, linesReceived, eventsFlushed, *sampleErrors, samplesReceived, tagErrors, tagsReceived) + l.HandlePacket([]byte("bar:200|c|#tag:value\nbar:200|c|#tag:\xc3\x28invalid")) } close(events) }() @@ -508,8 +652,8 @@ func TestInvalidUtf8InDatadogTagValue(t *testing.T) { testMapper := mapper.MetricMapper{} testMapper.InitCache(0) - ex := exporter.NewExporter(&testMapper, log.NewNopLogger()) - ex.Listen(events, eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount) + ex := NewExporter(&testMapper, log.NewNopLogger(), eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount) + ex.Listen(events) } // In the case of someone starting the statsd exporter with no mapping file specified @@ -522,8 +666,8 @@ func TestSummaryWithQuantilesEmptyMapping(t *testing.T) { testMapper := mapper.MetricMapper{} testMapper.InitCache(0) - ex := exporter.NewExporter(&testMapper, log.NewNopLogger()) - ex.Listen(events, eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount) + ex := NewExporter(&testMapper, log.NewNopLogger(), eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount) + ex.Listen(events) }() name := "default_foo" @@ -566,9 +710,9 @@ func TestHistogramUnits(t *testing.T) { go func() { testMapper := mapper.MetricMapper{} testMapper.InitCache(0) - ex := exporter.NewExporter(&testMapper, log.NewNopLogger()) + ex := NewExporter(&testMapper, log.NewNopLogger(), eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount) ex.Mapper.Defaults.TimerType = mapper.TimerTypeHistogram - ex.Listen(events, eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount) + ex.Listen(events) }() // Synchronously send a statsd event to wait for handleEvent execution. @@ -605,8 +749,8 @@ func TestCounterIncrement(t *testing.T) { go func() { testMapper := mapper.MetricMapper{} testMapper.InitCache(0) - ex := exporter.NewExporter(&testMapper, log.NewNopLogger()) - ex.Listen(events, eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount) + ex := NewExporter(&testMapper, log.NewNopLogger(), eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount) + ex.Listen(events) }() // Synchronously send a statsd event to wait for handleEvent execution. @@ -647,7 +791,7 @@ func TestCounterIncrement(t *testing.T) { } type statsDPacketHandler interface { - HandlePacket(packet []byte, udpPackets prometheus.Counter, linesReceived prometheus.Counter, eventsFlushed prometheus.Counter, sampleErrors prometheus.CounterVec, samplesReceived prometheus.Counter, tagErrors prometheus.Counter, tagsReceived prometheus.Counter) + HandlePacket(packet []byte) SetEventHandler(eh event.EventHandler) } @@ -656,7 +800,7 @@ type mockStatsDTCPListener struct { log.Logger } -func (ml *mockStatsDTCPListener) HandlePacket(packet []byte, udpPackets prometheus.Counter, linesReceived prometheus.Counter, eventsFlushed prometheus.Counter, sampleErrors prometheus.CounterVec, samplesReceived prometheus.Counter, tagErrors prometheus.Counter, tagsReceived prometheus.Counter) { +func (ml *mockStatsDTCPListener) HandlePacket(packet []byte) { // Forcing IPv4 because the TravisCI build environment does not have IPv6 // addresses. lc, err := net.ListenTCP("tcp4", nil) @@ -684,7 +828,7 @@ func (ml *mockStatsDTCPListener) HandlePacket(packet []byte, udpPackets promethe if err != nil { panic(fmt.Sprintf("mockStatsDTCPListener: accept failed: %v", err)) } - ml.HandleConn(sc, linesReceived, eventsFlushed, tcpConnections, tcpErrors, tcpLineTooLong, sampleErrors, samplesReceived, tagErrors, tagsReceived) + ml.HandleConn(sc) } // TestTtlExpiration validates expiration of time series. @@ -714,8 +858,8 @@ mappings: events := make(chan event.Events) defer close(events) go func() { - ex := exporter.NewExporter(testMapper, log.NewNopLogger()) - ex.Listen(events, eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount) + ex := NewExporter(testMapper, log.NewNopLogger(), eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount) + ex.Listen(events) }() ev := event.Events{ diff --git a/pkg/listener/listener.go b/pkg/listener/listener.go index 9bcd16f..9b9eb7d 100644 --- a/pkg/listener/listener.go +++ b/pkg/listener/listener.go @@ -28,16 +28,23 @@ import ( ) type StatsDUDPListener struct { - Conn *net.UDPConn - EventHandler event.EventHandler - Logger log.Logger + Conn *net.UDPConn + EventHandler event.EventHandler + Logger log.Logger + UDPPackets prometheus.Counter + LinesReceived prometheus.Counter + EventsFlushed prometheus.Counter + SampleErrors prometheus.CounterVec + SamplesReceived prometheus.Counter + TagErrors prometheus.Counter + TagsReceived prometheus.Counter } func (l *StatsDUDPListener) SetEventHandler(eh event.EventHandler) { l.EventHandler = eh } -func (l *StatsDUDPListener) Listen(udpPackets prometheus.Counter, linesReceived prometheus.Counter, eventsFlushed prometheus.Counter, sampleErrors prometheus.CounterVec, samplesReceived prometheus.Counter, tagErrors prometheus.Counter, tagsReceived prometheus.Counter) { +func (l *StatsDUDPListener) Listen() { buf := make([]byte, 65535) for { n, _, err := l.Conn.ReadFromUDP(buf) @@ -50,30 +57,39 @@ func (l *StatsDUDPListener) Listen(udpPackets prometheus.Counter, linesReceived level.Error(l.Logger).Log("error", err) return } - l.HandlePacket(buf[0:n], udpPackets, linesReceived, eventsFlushed, sampleErrors, samplesReceived, tagErrors, tagsReceived) + l.HandlePacket(buf[0:n]) } } -func (l *StatsDUDPListener) HandlePacket(packet []byte, udpPackets prometheus.Counter, linesReceived prometheus.Counter, eventsFlushed prometheus.Counter, sampleErrors prometheus.CounterVec, samplesReceived prometheus.Counter, tagErrors prometheus.Counter, tagsReceived prometheus.Counter) { - udpPackets.Inc() +func (l *StatsDUDPListener) HandlePacket(packet []byte) { + l.UDPPackets.Inc() lines := strings.Split(string(packet), "\n") for _, line := range lines { - linesReceived.Inc() - l.EventHandler.Queue(pkgLine.LineToEvents(line, sampleErrors, samplesReceived, tagErrors, tagsReceived, l.Logger), &eventsFlushed) + l.LinesReceived.Inc() + l.EventHandler.Queue(pkgLine.LineToEvents(line, l.SampleErrors, l.SamplesReceived, l.TagErrors, l.TagsReceived, l.Logger)) } } type StatsDTCPListener struct { - Conn *net.TCPListener - EventHandler event.EventHandler - Logger log.Logger + Conn *net.TCPListener + EventHandler event.EventHandler + Logger log.Logger + LinesReceived prometheus.Counter + EventsFlushed prometheus.Counter + SampleErrors prometheus.CounterVec + SamplesReceived prometheus.Counter + TagErrors prometheus.Counter + TagsReceived prometheus.Counter + TCPConnections prometheus.Counter + TCPErrors prometheus.Counter + TCPLineTooLong prometheus.Counter } func (l *StatsDTCPListener) SetEventHandler(eh event.EventHandler) { l.EventHandler = eh } -func (l *StatsDTCPListener) Listen(linesReceived prometheus.Counter, eventsFlushed prometheus.Counter, tcpConnections prometheus.Counter, tcpErrors prometheus.Counter, tcpLineTooLong prometheus.Counter, sampleErrors prometheus.CounterVec, samplesReceived prometheus.Counter, tagErrors prometheus.Counter, tagsReceived prometheus.Counter) { +func (l *StatsDTCPListener) Listen() { for { c, err := l.Conn.AcceptTCP() if err != nil { @@ -85,46 +101,53 @@ func (l *StatsDTCPListener) Listen(linesReceived prometheus.Counter, eventsFlush level.Error(l.Logger).Log("msg", "AcceptTCP failed", "error", err) os.Exit(1) } - go l.HandleConn(c, linesReceived, eventsFlushed, tcpConnections, tcpErrors, tcpLineTooLong, sampleErrors, samplesReceived, tagErrors, tagsReceived) + go l.HandleConn(c) } } -func (l *StatsDTCPListener) HandleConn(c *net.TCPConn, linesReceived prometheus.Counter, eventsFlushed prometheus.Counter, tcpConnections prometheus.Counter, tcpErrors prometheus.Counter, tcpLineTooLong prometheus.Counter, sampleErrors prometheus.CounterVec, samplesReceived prometheus.Counter, tagErrors prometheus.Counter, tagsReceived prometheus.Counter) { +func (l *StatsDTCPListener) HandleConn(c *net.TCPConn) { defer c.Close() - tcpConnections.Inc() + l.TCPConnections.Inc() r := bufio.NewReader(c) for { line, isPrefix, err := r.ReadLine() if err != nil { if err != io.EOF { - tcpErrors.Inc() + l.TCPErrors.Inc() level.Debug(l.Logger).Log("msg", "Read failed", "addr", c.RemoteAddr(), "error", err) } break } if isPrefix { - tcpLineTooLong.Inc() + l.TCPLineTooLong.Inc() level.Debug(l.Logger).Log("msg", "Read failed: line too long", "addr", c.RemoteAddr()) break } - linesReceived.Inc() - l.EventHandler.Queue(pkgLine.LineToEvents(string(line), sampleErrors, samplesReceived, tagErrors, tagsReceived, l.Logger), &eventsFlushed) + l.LinesReceived.Inc() + l.EventHandler.Queue(pkgLine.LineToEvents(string(line), l.SampleErrors, l.SamplesReceived, l.TagErrors, l.TagsReceived, l.Logger)) } } type StatsDUnixgramListener struct { - Conn *net.UnixConn - EventHandler event.EventHandler - Logger log.Logger + Conn *net.UnixConn + EventHandler event.EventHandler + Logger log.Logger + UnixgramPackets prometheus.Counter + LinesReceived prometheus.Counter + EventsFlushed prometheus.Counter + SampleErrors prometheus.CounterVec + SamplesReceived prometheus.Counter + TagErrors prometheus.Counter + TagsReceived prometheus.Counter } func (l *StatsDUnixgramListener) SetEventHandler(eh event.EventHandler) { l.EventHandler = eh } -func (l *StatsDUnixgramListener) Listen(unixgramPackets prometheus.Counter, linesReceived prometheus.Counter, eventsFlushed prometheus.Counter, sampleErrors prometheus.CounterVec, samplesReceived prometheus.Counter, tagErrors prometheus.Counter, tagsReceived prometheus.Counter) { +func (l *StatsDUnixgramListener) Listen() { buf := make([]byte, 65535) for { n, _, err := l.Conn.ReadFromUnix(buf) @@ -137,15 +160,15 @@ func (l *StatsDUnixgramListener) Listen(unixgramPackets prometheus.Counter, line level.Error(l.Logger).Log(err) os.Exit(1) } - l.HandlePacket(buf[:n], unixgramPackets, linesReceived, eventsFlushed, sampleErrors, samplesReceived, tagErrors, tagsReceived) + l.HandlePacket(buf[:n]) } } -func (l *StatsDUnixgramListener) HandlePacket(packet []byte, unixgramPackets prometheus.Counter, linesReceived prometheus.Counter, eventsFlushed prometheus.Counter, sampleErrors prometheus.CounterVec, samplesReceived prometheus.Counter, tagErrors prometheus.Counter, tagsReceived prometheus.Counter) { - unixgramPackets.Inc() +func (l *StatsDUnixgramListener) HandlePacket(packet []byte) { + l.UnixgramPackets.Inc() lines := strings.Split(string(packet), "\n") for _, line := range lines { - linesReceived.Inc() - l.EventHandler.Queue(pkgLine.LineToEvents(line, sampleErrors, samplesReceived, tagErrors, tagsReceived, l.Logger), &eventsFlushed) + l.LinesReceived.Inc() + l.EventHandler.Queue(pkgLine.LineToEvents(line, l.SampleErrors, l.SamplesReceived, l.TagErrors, l.TagsReceived, l.Logger)) } }