diff --git a/tracer/Makefile b/tracer/Makefile new file mode 100644 index 0000000000..06e2f08108 --- /dev/null +++ b/tracer/Makefile @@ -0,0 +1,12 @@ +TEST_DATA = \ + logs/trace.latency.log + +all: + +logs/trace.latency.log: + mkdir -p logs; \ + GST_DEBUG="GST_TRACER:7" GST_TRACERS=latency GST_DEBUG_FILE=$@ \ + gst-launch-1.0 -q audiotestsrc num-buffers=10 wave=silence ! audioconvert ! autoaudiosink + +check: $(TEST_DATA) + python3 -m unittest discover tracer diff --git a/tracer/README b/tracer/README new file mode 100644 index 0000000000..f4c0e5dda5 --- /dev/null +++ b/tracer/README @@ -0,0 +1,78 @@ +# Add a python api for tracer analyzers + +The python framework will parse the tracer log and aggregate information. +the tool writer will subclass from the Analyzer class and override method like: + + 'pad_push_buffer_pre' + +There is one method for each hook. Each of those methods will receive the parse +log line. In addition the framework will offer some extra api to allow to e.g. +write: + + pad.name() # pad name + pad.parent().name() # element name + pad.peer().parent() # peer element + pad.parent().state() # element state + +If we don't have full loging, we'd like to print a warning once, but make this +non fatal if possible. E.g. if we don't have logging for +element_{add,remove}_pad, we might not be able to provide pad.parent(). + +A tool can replay the log multiple times. If it does, it won't work in +'streaming' mode though. Streaming mode can offer live stats. + +## TODO +Do we want to provide classes like GstBin, GstElement, GstPad, ... to aggregate +info. We'd need to also provide a way to e.g. add a GstLogAnalyzer that knows +about data from the log tracer and populates the classes. We need to be able to +build a pipeline of analyzers, e.g. the analyzer calls GstLogAnalzer in its +catch-all handler and then processes some events individually. + +Parse the tracer classes. Add helper that for numeric values extract them, and +aggregate min/max/avg. Consider other statistical information (std. deviation) +and provide a rolling average for live view. + +Think of how analyzer classes can be combined: +- we'd like to build tools that import other analyzer classes and chain the + processing. + +## Examples +### Sequence chart generator (mscgen) + +1.) Write file header + +2.) collect element order +Replay the log and use pad_link_pre to collect pad->peer_pad relationship. +Build a sequence of element names and write to msc file. + +3.) collect event processing +Replay the log and use pad_push_event_pre to output message lines to mscfile. + +4.) write footer and run the tool. + +## Latency stats + +1.) collect per sink-latencies and for each sink per source latencies +Calculate min, max, avg. Consider streaming interface, where we update the stats +e.g. once a sec + +2.) in non-streaming mode write final statistic + +## cpu load stats + +Like latency stats, for cpu load. Process cpu load + per thread cpu load. + +## top + +Combine various stats tools into one. + +# Improve tracers +## log +* the log tracer logs args and results into misc categories +* issues + * not easy/reliable to detect its output among other trace output + * not easy to match pre/post lines + * uses own do_log method, instead of gst_tracer_record_log + * if we also log structures, we need to log the 'function' as the + structure-name, also fields would be key=(type)val, instead of key=value + * if we switch to gst_tracer_record_log, we'd need to register 27 formats :/ diff --git a/tracer/tracer/analyzer.py b/tracer/tracer/analyzer.py new file mode 100644 index 0000000000..f9c4b92104 --- /dev/null +++ b/tracer/tracer/analyzer.py @@ -0,0 +1,116 @@ +import re +import sys + +from tracer.parser import Parser +from tracer.structure import Structure + +_SCOPE_RELATED_TO = { + 'GST_TRACER_VALUE_SCOPE_PAD': 'Pad', + 'GST_TRACER_VALUE_SCOPE_ELEMENT': 'Element', + 'GST_TRACER_VALUE_SCOPE_THREAD': 'Thread', + 'GST_TRACER_VALUE_SCOPE_PROCESS': 'Process', +} + +_NUMERIC_TYPES = ('int', 'uint', 'gint', 'guint', 'gint64', 'guint64') + +class Analyzer(object): + '''Base class for a gst tracer analyzer.''' + + def __init__(self, log): + self.log = log + self.records = {} + self.data = {} + + def handle_tracer_class(self, event): + s = Structure(event[Parser.F_MESSAGE]) + # TODO only for debugging + #print("tracer class:", repr(s)) + name = s.name[:-len('.class')] + record = { + 'class': s, + 'scope' : {}, + 'value' : {}, + } + self.records[name] = record + for k,v in s.values.items(): + if v.name == 'scope': + # TODO only for debugging + #print("scope: [%s]=%s" % (k, v)) + record['scope'][k] = v + elif v.name == 'value': + # skip non numeric and those without min/max + if (v.values['type'] in _NUMERIC_TYPES and + 'min' in v.values and 'max' in v.values): + # TODO only for debugging + #print("value: [%s]=%s" % (k, v)) + record['value'][k] = v + #else: + # TODO only for debugging + #print("skipping value: [%s]=%s" % (k, v)) + + + def handle_tracer_entry(self, event): + # use first field in message (structure-id) if none + if event[Parser.F_FUNCTION]: + # TODO: parse params in event[Parser.F_MESSAGE] + vmethod_name = event[Parser.F_FUNCTION] + else: + s = Structure(event[Parser.F_MESSAGE]) + vmethod_name = s.name + record = self.records.get(vmethod_name) + if record: + # aggregate event based on class + for sk,sv in record['scope'].items(): + # look up bin by scope (or create new) + key = (_SCOPE_RELATED_TO[sv.values['related-to']] + + ":" + str(s.values[sk])) + scope = self.data.get(key) + if not scope: + scope = {} + self.data[key] = scope + for vk,vv in record['value'].items(): + key = vmethod_name + "/" + vk + data = scope.get(key) + if not data: + data = { + 'num': 0, + 'sum': 0, + } + if 'max' in vv.values and 'min' in vv.values: + data['min'] = int(vv.values['max']) + data['max'] = int(vv.values['min']) + scope[key] = data + # update min/max/sum and count via value + dv = int(s.values[vk]) + data['num'] += 1 + data['sum'] += dv + if 'min' in data: + data['min'] = min(dv, data['min']) + if 'max' in data: + data['max'] = max(dv, data['max']) + + # TODO: check if self has a catch-all handler and call first (check this in init) + # - we can use this to chain filters, allthough the chained filter + # would be doing the same as below + # check if self['vmethod'] is a function, if so call + vmethod = getattr (self, vmethod_name, None) + if callable(vmethod): + vmethod (event) + + def is_tracer_class(self, event): + return (event[Parser.F_FILENAME] == 'gsttracerrecord.c' and + event[Parser.F_CATEGORY] == 'GST_TRACER' and + '.class' in event[Parser.F_MESSAGE]) + + def is_tracer_entry(self, event): + return (not event[Parser.F_LINE] and not event[Parser.F_FILENAME]) + + def run(self): + for event in self.log: + # check if it is a tracer.class or tracer event + if self.is_tracer_class(event): + self.handle_tracer_class(event) + elif self.is_tracer_entry(event): + self.handle_tracer_entry(event) + #else: + # print("unhandled:", repr(event)) diff --git a/tracer/tracer/analyzer_test.py b/tracer/tracer/analyzer_test.py new file mode 100644 index 0000000000..fac00ee7ca --- /dev/null +++ b/tracer/tracer/analyzer_test.py @@ -0,0 +1,18 @@ +import unittest + +from tracer.analyzer import Analyzer + +TRACER_CLASS = ('0:00:00.036373170', 1788, '0x23bca70', 'TRACE', 'GST_TRACER', 'gsttracerrecord.c', 110, 'gst_tracer_record_build_format', None, r'latency.class, src=(structure)"scope\\,\\ type\\=\\(type\\)gchararray\\,\\ related-to\\=\\(GstTracerValueScope\\)GST_TRACER_VALUE_SCOPE_PAD\\;", sink=(structure)"scope\\,\\ type\\=\\(type\\)gchararray\\,\\ related-to\\=\\(GstTracerValueScope\\)GST_TRACER_VALUE_SCOPE_PAD\\;", time=(structure)"value\\,\\ type\\=\\(type\\)guint64\\,\\ description\\=\\(string\\)\\"time\\\\\\ it\\\\\\ took\\\\\\ for\\\\\\ the\\\\\\ buffer\\\\\\ to\\\\\\ go\\\\\\ from\\\\\\ src\\\\\\ to\\\\\\ sink\\\\\\ ns\\"\\,\\ flags\\=\\(GstTracerValueFlags\\)GST_TRACER_VALUE_FLAGS_AGGREGATED\\,\\ min\\=\\(guint64\\)0\\,\\ max\\=\\(guint64\\)18446744073709551615\\;";') + +TRACER_ENTRY = ('0:00:00.142391137', 1788, '0x7f8a201056d0', 'TRACE', 'GST_TRACER', '', 0, '', None, r'latency, src=(string)source_src, sink=(string)pulsesink0_sink, time=(guint64)47091349;') + +class TestAnalyzer(unittest.TestCase): + + def test_detect_tracer_class(self): + a = Analyzer(None) + self.assertTrue(a.is_tracer_class(TRACER_CLASS)) + + def test_detect_tracer_entry(self): + a = Analyzer(None) + self.assertTrue(a.is_tracer_entry(TRACER_ENTRY)) + diff --git a/tracer/tracer/parser.py b/tracer/tracer/parser.py new file mode 100644 index 0000000000..eed54cd030 --- /dev/null +++ b/tracer/tracer/parser.py @@ -0,0 +1,86 @@ +import os +import re +import sys + +# new tracer class +# 0:00:00.041536066 1788 0x14b2150 TRACE GST_TRACER gsttracerrecord.c:110:gst_tracer_record_build_format: latency.class, src=(structure)"scope\,\ type\=\(GType\)NULL\,\ related-to\=\(GstTracerValueScope\)GST_TRACER_VALUE_SCOPE_PAD\;", sink=(structure)"scope\,\ type\=\(GType\)NULL\,\ related-to\=\(GstTracerValueScope\)GST_TRACER_VALUE_SCOPE_PAD\;", time=(structure)"value\,\ type\=\(GType\)NULL\,\ description\=\(string\)\"time\\\ it\\\ took\\\ for\\\ the\\\ buffer\\\ to\\\ go\\\ from\\\ src\\\ to\\\ sink\\\ ns\"\,\ flags\=\(GstTracerValueFlags\)GST_TRACER_VALUE_FLAGS_AGGREGATED\,\ min\=\(guint64\)0\,\ max\=\(guint64\)18446744073709551615\;"; + +# tracer log entry +# 0:00:00.079422574 7664 0x238ac70 TRACE GST_TRACER :0:: thread-rusage, thread-id=(guint64)37268592, ts=(guint64)79416000, average-cpuload=(uint)1000, current-cpuload=(uint)1000, time=(guint64)79418045; + +# from log tracer +# 0:00:00.460486001 18356 0x21de780 TRACE GST_ELEMENT_PADS :0:do_element_add_pad: 0:00:00.460483603, key=val, ... +def _log_line_regex(): + + # "0:00:00.777913000 " + TIME = r"(\d+:\d\d:\d\d\.\d+)\s+" + # "DEBUG " + LEVEL = "([A-Z]+)\s+" + # "0x8165430 " + THREAD = r"(0x[0-9a-f]+)\s+" + # "GST_REFCOUNTING ", "flacdec " + CATEGORY = "([A-Za-z0-9_-]+)\s+" + # " 3089 " + PID = r"(\d+)\s*" + FILENAME = r"([^:]*):" + LINE = r"(\d+):" + FUNCTION = r"([A-Za-z0-9_]*):" + # FIXME: When non-g(st)object stuff is logged with *_OBJECT (like + # buffers!), the address is printed *without* <> brackets! + OBJECT = "(?:<([^>]+)>)?" + MESSAGE = "(.+)" + + ANSI = "(?:\x1b\\[[0-9;]*m\\s*)*\\s*" + + return [TIME, ANSI, PID, ANSI, THREAD, ANSI, LEVEL, ANSI, CATEGORY, + FILENAME, LINE, FUNCTION, ANSI, OBJECT, ANSI, MESSAGE] + + +class Parser(object): + '''Helper to parse a tracer log''' + + # record fields + F_TIME = 0 + F_PID = 1 + F_THREAD = 2 + F_LEVEL = 3 + F_CATEGORY = 4 + F_FILENAME = 5 + F_LINE = 6 + F_FUNCTION = 7 + F_OBJECT = 8 + F_MESSAGE = 9 + + def __init__(self, filename): + self.filename = filename + self.log_regex = re.compile(''.join(_log_line_regex())) + self.file = None + + def __enter__(self): + def __is_tracer(line): + return 'TRACE' in line + + if self.filename != '-': + self.file = open(self.filename, 'rt') + else: + self.file = sys.stdin + self.data = filter(__is_tracer, self.file) + return self + + def __exit__(self, *args): + if self.filename != '-': + self.file.close() + self.file = None + + def __iter__(self): + return self + + def __next__(self): + while True: + line = next(self.data) + match = self.log_regex.match(line) + if match: + g = list(match.groups()) + g[Parser.F_PID] = int(g[Parser.F_PID]) + g[Parser.F_LINE] = int(g[Parser.F_LINE]) + return g diff --git a/tracer/tracer/parser_test.py b/tracer/tracer/parser_test.py new file mode 100644 index 0000000000..0a3f6c1a3d --- /dev/null +++ b/tracer/tracer/parser_test.py @@ -0,0 +1,46 @@ +import sys +import unittest + +from tracer.parser import Parser + +TESTFILE = './logs/trace.latency.log' + +TEXT_DATA = ['first line', 'second line'] + +TRACER_LOG_DATA = [ + '0:00:00.079422574 7664 0x238ac70 TRACE GST_TRACER :0:: thread-rusage, thread-id=(guint64)37268592, ts=(guint64)79416000, average-cpuload=(uint)1000, current-cpuload=(uint)1000, time=(guint64)79418045;' +] +TRACER_CLASS_LOG_DATA = [ + '0:00:00.041536066 1788 0x14b2150 TRACE GST_TRACER gsttracerrecord.c:110:gst_tracer_record_build_format: latency.class, src=(structure)"scope\,\ type\=\(type\)gchararray\,\ related-to\=\(GstTracerValueScope\)GST_TRACER_VALUE_SCOPE_PAD\;", sink=(structure)"scope\,\ type\=\(type\)gchararray\,\ related-to\=\(GstTracerValueScope\)GST_TRACER_VALUE_SCOPE_PAD\;", time=(structure)"value\,\ type\=\(type\)guint64\,\ description\=\(string\)\"time\\\ it\\\ took\\\ for\\\ the\\\ buffer\\\ to\\\ go\\\ from\\\ src\\\ to\\\ sink\\\ ns\"\,\ flags\=\(GstTracerValueFlags\)GST_TRACER_VALUE_FLAGS_AGGREGATED\,\ min\=\(guint64\)0\,\ max\=\(guint64\)18446744073709551615\;";' +] + +class TestParser(unittest.TestCase): + + def test___init__(self): + log = Parser(TESTFILE) + self.assertIsNone(log.file) + + def test___enter___with_file(self): + with Parser(TESTFILE) as log: + self.assertIsNotNone(log.file) + + def test___enter___with_stdin(self): + sys.stdin = TEXT_DATA + with Parser('-') as log: + self.assertIsNotNone(log.file) + + def test_random_text_reports_none(self): + sys.stdin = TEXT_DATA + with Parser('-') as log: + with self.assertRaises(StopIteration): + next(log) + + def test_log_file_reports_trace_log(self): + with Parser(TESTFILE) as log: + self.assertIsNotNone(next(log)) + + def test_trace_log_parsed(self): + sys.stdin = TRACER_LOG_DATA + with Parser('-') as log: + event = next(log) + self.assertEquals(len(event), 10) diff --git a/tracer/tracer/structure.py b/tracer/tracer/structure.py new file mode 100644 index 0000000000..f41e2e6bfe --- /dev/null +++ b/tracer/tracer/structure.py @@ -0,0 +1,92 @@ +import re + +class Structure(object): + '''Gst Structure parser.''' + + def __init__(self, text): + self.text = text + self.name = None + self.types = {} + self.values = {} + self.pos = 0 + self.valid = False + try: + self._parse(self.text) + self.valid = True + except ValueError: + pass + + def __repr__(self): + return self.text + + def _parse(self, s): + scan = True + # parse id + p = s.find(',') + if p == -1: + p = s.index(';') + scan = False + self.name = s[:p] + s = s[(p + 2):] # skip 'name, ' + self.pos += p + 2 + # parse fields + while scan: + p = s.index('=') + k = s[:p] + s = s[(p + 1):] # skip 'key=' + self.pos += p + 1 + p = s.index('(') + s = s[(p + 1):] # skip '(' + self.pos += p + 1 + p = s.index(')') + t = s[:p] + s = s[(p + 1):] # skip 'type)' + self.pos += p + 1 + if t == 'structure': + p = s.index('"') + s = s[(p + 1):] # skip '"' + self.pos += p + 1 + # find next '"' without preceeding '\' + sub = s + sublen = 0 + while True: + p = sub.index('"') + sublen += p + 1 + if sub[p - 1] != '\\': + sub = None + break; + sub = sub[(p + 1):] + if not sub: + sub = s[:(sublen - 1)] + # unescape \., but not \\. (using a backref) + # FIXME: try to combine + # also: + # unescape = re.compile('search') + # unescape.sub('replacement', sub) + sub = re.sub(r'\\\\', r'\\', sub) + sub = re.sub(r'(?