diff --git a/pkg/line/line.go b/pkg/line/line.go index 949a5ea..4abebca 100644 --- a/pkg/line/line.go +++ b/pkg/line/line.go @@ -211,7 +211,7 @@ func (p *Parser) LineToEvents(line string, sampleErrors prometheus.CounterVec, s elements := strings.SplitN(line, ":", 2) if len(elements) < 2 || len(elements[0]) == 0 || !utf8.ValidString(line) { sampleErrors.WithLabelValues("malformed_line").Inc() - level.Debug(logger).Log("msg", "Bad line from StatsD", "line", line) + level.Debug(logger).Log("msg", "Bad line", "line", line) return events } @@ -223,12 +223,17 @@ func (p *Parser) LineToEvents(line string, sampleErrors prometheus.CounterVec, s // don't allow mixed tagging styles sampleErrors.WithLabelValues("mixed_tagging_styles").Inc() - level.Debug(logger).Log("msg", "Bad line (multiple tagging styles) from StatsD", "line", line) + level.Debug(logger).Log("msg", "Bad line (multiple tagging styles)", "line", line) return events } var samples []string lineParts := strings.SplitN(elements[1], "|", 3) + if len(lineParts) < 2 { + sampleErrors.WithLabelValues("not_enough_parts_after_colon").Inc() + level.Debug(logger).Log("msg", "Bad line: not enough '|'-delimited parts after first ':'", "line", line) + return events + } if strings.Contains(lineParts[0], ":") { // handle DogStatsD extended aggregation isValidAggType := false @@ -251,7 +256,7 @@ func (p *Parser) LineToEvents(line string, sampleErrors prometheus.CounterVec, s samples = aggLines } else { sampleErrors.WithLabelValues("invalid_extended_aggregate_type").Inc() - level.Debug(logger).Log("msg", "Bad line (invalid extended aggregate type) from StatsD", "line", line) + level.Debug(logger).Log("msg", "Bad line (invalid extended aggregate type)", "line", line) return events } } else if usingDogStatsDTags { diff --git a/pkg/line/line_test.go b/pkg/line/line_test.go index 970320c..305d790 100644 --- a/pkg/line/line_test.go +++ b/pkg/line/line_test.go @@ -823,6 +823,12 @@ func TestLineToEvents(t *testing.T) { }, }, }, + "invalid event split over lines part 1": { + in: "karafka.consumer.consume.cpu_idle_second: 0.111090 -0.055903 -0.195390 ( 2.419002)", + }, + "invalid event split over lines part 2": { + in: "|h|#consumer:Kafka::SharedConfigurationConsumer,topic:shared_configuration_update,partition:1,consumer_group:tc_rc_us", + }, } parser := NewParser()