From 825b734b3e45e421fa9ecec5030f7f02f139f782 Mon Sep 17 00:00:00 2001 From: Wangchong Zhou Date: Thu, 19 Jul 2018 14:57:36 -0700 Subject: [PATCH] Implement "fsm" match type Signed-off-by: Wangchong Zhou --- pkg/mapper/mapper.go | 175 +++++++++++++++++- pkg/mapper/mapper_test.go | 363 ++++++++++++++++++++++++++++++++++++++ pkg/mapper/match.go | 3 + 3 files changed, 532 insertions(+), 9 deletions(-) diff --git a/pkg/mapper/mapper.go b/pkg/mapper/mapper.go index 973607c..c37a783 100644 --- a/pkg/mapper/mapper.go +++ b/pkg/mapper/mapper.go @@ -14,13 +14,17 @@ package mapper import ( + "bufio" "fmt" "io/ioutil" + "os" "regexp" + "strconv" "strings" "sync" "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/common/log" yaml "gopkg.in/yaml.v2" ) @@ -28,26 +32,41 @@ var ( statsdMetricRE = `[a-zA-Z_](-?[a-zA-Z0-9_])+` templateReplaceRE = `(\$\{?\d+\}?)` - metricLineRE = regexp.MustCompile(`^(\*\.|` + statsdMetricRE + `\.)+(\*|` + statsdMetricRE + `)$`) - metricNameRE = regexp.MustCompile(`^([a-zA-Z_]|` + templateReplaceRE + `)([a-zA-Z0-9_]|` + templateReplaceRE + `)*$`) - labelNameRE = regexp.MustCompile(`^[a-zA-Z_][a-zA-Z0-9_]+$`) + metricLineRE = regexp.MustCompile(`^(\*\.|` + statsdMetricRE + `\.)+(\*|` + statsdMetricRE + `)$`) + metricNameRE = regexp.MustCompile(`^([a-zA-Z_]|` + templateReplaceRE + `)([a-zA-Z0-9_]|` + templateReplaceRE + `)*$`) + labelNameRE = regexp.MustCompile(`^[a-zA-Z_][a-zA-Z0-9_]+$`) + labelValueExpansionRE = regexp.MustCompile(`\${?(\d+)}?`) ) type mapperConfigDefaults struct { - TimerType TimerType `yaml:"timer_type"` - Buckets []float64 `yaml:"buckets"` - Quantiles []metricObjective `yaml:"quantiles"` - MatchType MatchType `yaml:"match_type"` + TimerType TimerType `yaml:"timer_type"` + Buckets []float64 `yaml:"buckets"` + Quantiles []metricObjective `yaml:"quantiles"` + MatchType MatchType `yaml:"match_type"` + DumpFSM string `yaml:"dump_fsm"` + FSMFallback MatchType `yaml:"fsm_fallback"` +} + +type mappingState struct { + transitions map[string]*mappingState + // result is nil unless there's a metric ends with this state + result *MetricMapping } type MetricMapper struct { Defaults mapperConfigDefaults `yaml:"defaults"` Mappings []MetricMapping `yaml:"mappings"` + FSM *mappingState mutex sync.Mutex MappingsCount prometheus.Gauge } +type labelFormatter struct { + captureIdx int + fmtString string +} + type matchMetricType string type MetricMapping struct { @@ -55,6 +74,7 @@ type MetricMapping struct { Name string `yaml:"name"` regex *regexp.Regexp Labels prometheus.Labels `yaml:"labels"` + LabelsFormatter map[string]labelFormatter TimerType TimerType `yaml:"timer_type"` Buckets []float64 `yaml:"buckets"` Quantiles []metricObjective `yaml:"quantiles"` @@ -94,7 +114,14 @@ func (m *MetricMapper) InitFromYAMLString(fileContents string) error { n.Defaults.MatchType = MatchTypeGlob } + maxPossibleTransitions := len(n.Mappings) + + n.FSM = &mappingState{} + n.FSM.transitions = make(map[string]*mappingState, maxPossibleTransitions) + for i := range n.Mappings { + maxPossibleTransitions-- + currentMapping := &n.Mappings[i] // check that label is correct @@ -120,7 +147,57 @@ func (m *MetricMapper) InitFromYAMLString(fileContents string) error { currentMapping.Action = ActionTypeMap } - if currentMapping.MatchType == MatchTypeGlob { + if currentMapping.MatchType == MatchTypeFSM { + // first split by "." + matchFields := strings.Split(currentMapping.Match, ".") + // fill into our FSM + root := n.FSM + captureCount := 0 + for i, field := range matchFields { + state, prs := root.transitions[field] + if !prs { + state = &mappingState{} + (*state).transitions = make(map[string]*mappingState, maxPossibleTransitions) + root.transitions[field] = state + // if this is last field, set result to currentMapping instance + if i == len(matchFields)-1 { + root.transitions[field].result = currentMapping + } + } + if field == "*" { + captureCount++ + } + + // goto next state + root = state + } + currentLabelFormatter := make(map[string]labelFormatter, captureCount) + for label, valueExpr := range currentMapping.Labels { + matches := labelValueExpansionRE.FindAllStringSubmatch(valueExpr, -1) + if len(matches) == 0 { + // if no regex expansion found, keep it as it is + currentLabelFormatter[label] = labelFormatter{captureIdx: -1, fmtString: valueExpr} + continue + } else if len(matches) > 1 { + return fmt.Errorf("multiple captures is not supported in FSM matching type") + } + var valueFormatter string + idx, err := strconv.Atoi(matches[0][1]) + if err != nil { + return fmt.Errorf("invalid label value expression: %s", valueExpr) + } + if idx > captureCount || idx < 1 { + // index larger than captured count, replace all expansion with empty string + valueFormatter = labelValueExpansionRE.ReplaceAllString(valueExpr, "") + idx = 0 + } else { + valueFormatter = labelValueExpansionRE.ReplaceAllString(valueExpr, "%s") + } + currentLabelFormatter[label] = labelFormatter{captureIdx: idx - 1, fmtString: valueFormatter} + } + currentMapping.LabelsFormatter = currentLabelFormatter + } + if currentMapping.MatchType == MatchTypeGlob || n.Defaults.FSMFallback == MatchTypeGlob { if !metricLineRE.MatchString(currentMapping.Match) { return fmt.Errorf("invalid match: %s", currentMapping.Match) } @@ -133,7 +210,7 @@ func (m *MetricMapper) InitFromYAMLString(fileContents string) error { } else { currentMapping.regex = regex } - } else { + } else if currentMapping.MatchType == MatchTypeRegex || n.Defaults.FSMFallback == MatchTypeRegex { if regex, err := regexp.Compile(currentMapping.Match); err != nil { return fmt.Errorf("invalid regex %s in mapping: %v", currentMapping.Match, err) } else { @@ -154,12 +231,18 @@ func (m *MetricMapper) InitFromYAMLString(fileContents string) error { } } + if len(n.Defaults.DumpFSM) > 0 { + m.dumpFSM(n.Defaults.DumpFSM, n.FSM) + } m.mutex.Lock() defer m.mutex.Unlock() m.Defaults = n.Defaults m.Mappings = n.Mappings + if len(n.FSM.transitions) > 0 { + m.FSM = n.FSM + } if m.MappingsCount != nil { m.MappingsCount.Set(float64(len(n.Mappings))) @@ -168,6 +251,35 @@ func (m *MetricMapper) InitFromYAMLString(fileContents string) error { return nil } +func (m *MetricMapper) dumpFSM(fileName string, root *mappingState) { + log.Infoln("Start dumping FSM to", fileName) + idx := 0 + states := make(map[int]*mappingState) + states[idx] = root + + f, _ := os.Create(fileName) + w := bufio.NewWriter(f) + w.WriteString("digraph g {\n") + w.WriteString("rankdir=LR\n") // make it vertical + w.WriteString("node [ label=\"\",style=filled,fillcolor=white,shape=circle ]\n") // remove label of node + + for idx < len(states) { + for field, transition := range states[idx].transitions { + states[len(states)] = transition + w.WriteString(fmt.Sprintf("%d -> %d [label = \"%s\"];\n", idx, len(states)-1, field)) + if transition.transitions == nil || len(transition.transitions) == 0 { + w.WriteString(fmt.Sprintf("%d [color=\"#82B366\",fillcolor=\"#D5E8D4\"];\n", len(states)-1)) + } + + } + idx++ + } + w.WriteString(fmt.Sprintf("0 [color=\"#D6B656\",fillcolor=\"#FFF2CC\"];\n")) + w.WriteString("}") + w.Flush() + log.Infoln("Finish dumping FSM") +} + func (m *MetricMapper) InitFromFile(fileName string) error { mappingStr, err := ioutil.ReadFile(fileName) if err != nil { @@ -177,6 +289,51 @@ func (m *MetricMapper) InitFromFile(fileName string) error { } func (m *MetricMapper) GetMapping(statsdMetric string, statsdMetricType MetricType) (*MetricMapping, prometheus.Labels, bool) { + if root := m.FSM; root != nil { + matchFields := strings.Split(statsdMetric, ".") + captures := make(map[int]string, len(matchFields)) + captureIdx := 0 + filedsCount := len(matchFields) + for i, field := range matchFields { + if root.transitions == nil { + break + } + state, prs := root.transitions[field] + if !prs { + state, prs = root.transitions["*"] + if !prs { + break + } + captures[captureIdx] = field + captureIdx++ + } + if state.result != nil && i == filedsCount-1 { + // format valueExpr + mapping := *state.result + labels := prometheus.Labels{} + for label := range mapping.Labels { + formatter := mapping.LabelsFormatter[label] + idx := formatter.captureIdx + var value string + if idx == -1 { + value = formatter.fmtString + } else { + value = fmt.Sprintf(formatter.fmtString, captures[idx]) + } + labels[label] = string(value) + } + return state.result, labels, true + } + root = state + } + + // if fsm_fallback is not defined, return immediately + if len(m.Defaults.FSMFallback) == 0 { + log.Infof("%s not matched by fsm\n", statsdMetric) + return nil, nil, false + } + } + m.mutex.Lock() defer m.mutex.Unlock() diff --git a/pkg/mapper/mapper_test.go b/pkg/mapper/mapper_test.go index 79b10df..0b74f1e 100644 --- a/pkg/mapper/mapper_test.go +++ b/pkg/mapper/mapper_test.go @@ -368,6 +368,30 @@ mappings: `, configBad: true, }, + //Config with multiple captures for fsm match type + { + config: `--- + mappings: + - match: "foo.*.*" + match_type: fsm + name: "foo" + labels: + bar: "$1-$2" + `, + configBad: true, + }, + //Config with non-numeric capture index for fsm match type + { + config: `--- + mappings: + - match: "foo.*.*" + match_type: fsm + name: "foo" + labels: + bar: "$a" + `, + configBad: true, + }, //Config with non-matched metric. { config: `--- @@ -518,6 +542,345 @@ mappings: } } +func TestFSMMatcher(t *testing.T) { + scenarios := []struct { + config string + configBad bool + mappings mappings + }{ + // Empty config. + {}, + // Config with several mapping definitions. + { + config: `--- +defaults: + match_type: "fsm" +mappings: +- match: test.dispatcher.*.*.* + name: "dispatch_events" + labels: + processor: "$1" + action: "$2" + result: "$3" + job: "test_dispatcher" +- match: test.my-dispatch-host01.name.dispatcher.*.*.* + name: "host_dispatch_events" + labels: + processor: "$1" + action: "$2" + result: "$3" + job: "test_dispatcher" +- match: request_time.*.*.*.*.*.*.*.*.*.*.*.* + name: "tyk_http_request" + labels: + method_and_path: "$1" + response_code: "$2" + apikey: "$3" + apiversion: "$4" + apiname: "$5" + apiid: "$6" + ipv4_t1: "$7" + ipv4_t2: "$8" + ipv4_t3: "$9" + ipv4_t4: "$10" + orgid: "$11" + oauthid: "$12" +- match: "*.*" + name: "catchall" + labels: + first: "$1" + second: "second_label_$2" + third: "$3" + job: "-" + `, + mappings: mappings{ + "test.dispatcher.FooProcessor.send.succeeded": { + name: "dispatch_events", + labels: map[string]string{ + "processor": "FooProcessor", + "action": "send", + "result": "succeeded", + "job": "test_dispatcher", + }, + }, + "test.my-dispatch-host01.name.dispatcher.FooProcessor.send.succeeded": { + name: "host_dispatch_events", + labels: map[string]string{ + "processor": "FooProcessor", + "action": "send", + "result": "succeeded", + "job": "test_dispatcher", + }, + }, + "request_time.get/threads/1/posts.200.00000000.nonversioned.discussions.a11bbcdf0ac64ec243658dc64b7100fb.172.20.0.1.12ba97b7eaa1a50001000001.": { + name: "tyk_http_request", + labels: map[string]string{ + "method_and_path": "get/threads/1/posts", + "response_code": "200", + "apikey": "00000000", + "apiversion": "nonversioned", + "apiname": "discussions", + "apiid": "a11bbcdf0ac64ec243658dc64b7100fb", + "ipv4_t1": "172", + "ipv4_t2": "20", + "ipv4_t3": "0", + "ipv4_t4": "1", + "orgid": "12ba97b7eaa1a50001000001", + "oauthid": "", + }, + }, + "foo.bar": { + name: "catchall", + labels: map[string]string{ + "first": "foo", + "second": "second_label_bar", + "third": "", + "job": "-", + }, + }, + "foo.bar.baz": {}, + }, + }, + // local match_type + { + config: `--- +mappings: +- match: test.dispatcher.*.*.* + name: "dispatch_events" + labels: + match_type: "fsm" + processor: "$1" + action: "$2" + result: "$3" + job: "test_dispatcher" +- match: test.my-dispatch-host01.name.dispatcher.*.*.* + name: "host_dispatch_events" + labels: + match_type: "fsm" + processor: "$1" + action: "$2" + result: "$3" + job: "test_dispatcher" +- match: request_time.*.*.*.*.*.*.*.*.*.*.*.* + name: "tyk_http_request" + labels: + match_type: "fsm" + method_and_path: "$1" + response_code: "$2" + apikey: "$3" + apiversion: "$4" + apiname: "$5" + apiid: "$6" + ipv4_t1: "$7" + ipv4_t2: "$8" + ipv4_t3: "$9" + ipv4_t4: "$10" + orgid: "$11" + oauthid: "$12" +- match: "*.*" + name: "catchall" + labels: + match_type: "fsm" + first: "$1" + second: "second_label_$2" + third: "$3" + job: "-" + `, + mappings: mappings{ + "test.dispatcher.FooProcessor.send.succeeded": { + name: "dispatch_events", + labels: map[string]string{ + "processor": "FooProcessor", + "action": "send", + "result": "succeeded", + "job": "test_dispatcher", + }, + }, + "test.my-dispatch-host01.name.dispatcher.FooProcessor.send.succeeded": { + name: "host_dispatch_events", + labels: map[string]string{ + "processor": "FooProcessor", + "action": "send", + "result": "succeeded", + "job": "test_dispatcher", + }, + }, + "request_time.get/threads/1/posts.200.00000000.nonversioned.discussions.a11bbcdf0ac64ec243658dc64b7100fb.172.20.0.1.12ba97b7eaa1a50001000001.": { + name: "tyk_http_request", + labels: map[string]string{ + "method_and_path": "get/threads/1/posts", + "response_code": "200", + "apikey": "00000000", + "apiversion": "nonversioned", + "apiname": "discussions", + "apiid": "a11bbcdf0ac64ec243658dc64b7100fb", + "ipv4_t1": "172", + "ipv4_t2": "20", + "ipv4_t3": "0", + "ipv4_t4": "1", + "orgid": "12ba97b7eaa1a50001000001", + "oauthid": "", + }, + }, + "foo.bar": { + name: "catchall", + labels: map[string]string{ + "first": "foo", + "second": "second_label_bar", + "third": "", + "job": "-", + }, + }, + "foo.bar.baz": {}, + }, + }, + } + + mapper := MetricMapper{} + for i, scenario := range scenarios { + err := mapper.InitFromYAMLString(scenario.config) + if err != nil && !scenario.configBad { + t.Fatalf("%d. Config load error: %s %s", i, scenario.config, err) + } + if err == nil && scenario.configBad { + t.Fatalf("%d. Expected bad config, but loaded ok: %s", i, scenario.config) + } + + var dummyMetricType MetricType = "" + for metric, mapping := range scenario.mappings { + m, labels, present := mapper.GetMapping(metric, dummyMetricType) + if present && mapping.name != "" && m.Name != mapping.name { + t.Fatalf("%d.%q: Expected name %v, got %v", i, metric, m.Name, mapping.name) + } + if mapping.notPresent && present { + t.Fatalf("%d.%q: Expected metric to not be present", i, metric) + } + if len(labels) != len(mapping.labels) { + t.Fatalf("%d.%q: Expected %d labels, got %d", i, metric, len(mapping.labels), len(labels)) + } + for label, value := range labels { + if mapping.labels[label] != value { + t.Fatalf("%d.%q: Expected labels %v, got %v", i, metric, mapping, labels) + } + } + + } + } +} + +func TestFSMMatcherFallbackRegex(t *testing.T) { + scenarios := []struct { + config string + configBad bool + mappings mappings + }{ + // Config with simple matcher as default mathcer and fallback_regex to false. + { + config: `--- +defaults: + match_type: "fsm" +mappings: +- match: client.*.request.duration + name: "request_size" + labels: + client: "$1" +- match: client.*.*.size + name: "request_response_size" + labels: + client: "$1" + direction: "$2" + `, + mappings: mappings{ + "client.a.request.duration": { + name: "request_size", + labels: map[string]string{ + "client": "a", + }, + }, + "client.a.request.size": {}, + "client.a.response.size": { + name: "request_response_size", + labels: map[string]string{ + "client": "a", + "direction": "response", + }, + }, + }, + }, + // Config with simple matcher as default mathcer and fallback_regex to true. + { + config: `--- +defaults: + match_type: "fsm" + fsm_fallback: "glob" +mappings: +- match: client.*.request.duration + name: "request_size" + labels: + client: "$1" +- match: client.*.*.size + name: "request_response_size" + labels: + client: "$1" + direction: "$2" + `, + mappings: mappings{ + "client.a.request.duration": { + name: "request_size", + labels: map[string]string{ + "client": "a", + }, + }, + "client.a.request.size": { + name: "request_response_size", + labels: map[string]string{ + "client": "a", + "direction": "request", + }, + }, + "client.a.response.size": { + name: "request_response_size", + labels: map[string]string{ + "client": "a", + "direction": "response", + }, + }, + }, + }, + } + + mapper := MetricMapper{} + for i, scenario := range scenarios { + err := mapper.InitFromYAMLString(scenario.config) + if err != nil && !scenario.configBad { + t.Fatalf("%d. Config load error: %s %s", i, scenario.config, err) + } + if err == nil && scenario.configBad { + t.Fatalf("%d. Expected bad config, but loaded ok: %s", i, scenario.config) + } + + var dummyMetricType MetricType = "" + for metric, mapping := range scenario.mappings { + m, labels, present := mapper.GetMapping(metric, dummyMetricType) + if present && mapping.name != "" && m.Name != mapping.name { + t.Fatalf("%d.%q: Expected name %v, got %v", i, metric, m.Name, mapping.name) + } + if mapping.notPresent && present { + t.Fatalf("%d.%q: Expected metric to not be present", i, metric) + } + if len(labels) != len(mapping.labels) { + t.Fatalf("%d.%q: Expected %d labels, got %d", i, metric, len(mapping.labels), len(labels)) + } + for label, value := range labels { + if mapping.labels[label] != value { + t.Fatalf("%d.%q: Expected labels %v, got %v", i, metric, mapping, labels) + } + } + + } + } +} + func TestAction(t *testing.T) { scenarios := []struct { config string diff --git a/pkg/mapper/match.go b/pkg/mapper/match.go index 12d5e8d..276b52a 100644 --- a/pkg/mapper/match.go +++ b/pkg/mapper/match.go @@ -20,6 +20,7 @@ type MatchType string const ( MatchTypeGlob MatchType = "glob" MatchTypeRegex MatchType = "regex" + MatchTypeFSM MatchType = "fsm" MatchTypeDefault MatchType = "" ) @@ -32,6 +33,8 @@ func (t *MatchType) UnmarshalYAML(unmarshal func(interface{}) error) error { switch MatchType(v) { case MatchTypeRegex: *t = MatchTypeRegex + case MatchTypeFSM: + *t = MatchTypeFSM case MatchTypeGlob, MatchTypeDefault: *t = MatchTypeGlob default: