tracer: add new python library to process tracer logs

This is the beginning of a python library for wrting tools that process tracer
logs. This library contains a structure parser written in python to avoid the
dependency on gobject introspection (and the slowness and non pythoness that
comes with it).
This commit is contained in:
Stefan Sauer 2016-12-12 22:38:57 +01:00
parent 9ca26b661d
commit 896201af35
8 changed files with 529 additions and 0 deletions

12
tracer/Makefile Normal file
View file

@ -0,0 +1,12 @@
TEST_DATA = \
logs/trace.latency.log
all:
logs/trace.latency.log:
mkdir -p logs; \
GST_DEBUG="GST_TRACER:7" GST_TRACERS=latency GST_DEBUG_FILE=$@ \
gst-launch-1.0 -q audiotestsrc num-buffers=10 wave=silence ! audioconvert ! autoaudiosink
check: $(TEST_DATA)
python3 -m unittest discover tracer

78
tracer/README Normal file
View file

@ -0,0 +1,78 @@
# Add a python api for tracer analyzers
The python framework will parse the tracer log and aggregate information.
the tool writer will subclass from the Analyzer class and override method like:
'pad_push_buffer_pre'
There is one method for each hook. Each of those methods will receive the parse
log line. In addition the framework will offer some extra api to allow to e.g.
write:
pad.name() # pad name
pad.parent().name() # element name
pad.peer().parent() # peer element
pad.parent().state() # element state
If we don't have full loging, we'd like to print a warning once, but make this
non fatal if possible. E.g. if we don't have logging for
element_{add,remove}_pad, we might not be able to provide pad.parent().
A tool can replay the log multiple times. If it does, it won't work in
'streaming' mode though. Streaming mode can offer live stats.
## TODO
Do we want to provide classes like GstBin, GstElement, GstPad, ... to aggregate
info. We'd need to also provide a way to e.g. add a GstLogAnalyzer that knows
about data from the log tracer and populates the classes. We need to be able to
build a pipeline of analyzers, e.g. the analyzer calls GstLogAnalzer in its
catch-all handler and then processes some events individually.
Parse the tracer classes. Add helper that for numeric values extract them, and
aggregate min/max/avg. Consider other statistical information (std. deviation)
and provide a rolling average for live view.
Think of how analyzer classes can be combined:
- we'd like to build tools that import other analyzer classes and chain the
processing.
## Examples
### Sequence chart generator (mscgen)
1.) Write file header
2.) collect element order
Replay the log and use pad_link_pre to collect pad->peer_pad relationship.
Build a sequence of element names and write to msc file.
3.) collect event processing
Replay the log and use pad_push_event_pre to output message lines to mscfile.
4.) write footer and run the tool.
## Latency stats
1.) collect per sink-latencies and for each sink per source latencies
Calculate min, max, avg. Consider streaming interface, where we update the stats
e.g. once a sec
2.) in non-streaming mode write final statistic
## cpu load stats
Like latency stats, for cpu load. Process cpu load + per thread cpu load.
## top
Combine various stats tools into one.
# Improve tracers
## log
* the log tracer logs args and results into misc categories
* issues
* not easy/reliable to detect its output among other trace output
* not easy to match pre/post lines
* uses own do_log method, instead of gst_tracer_record_log
* if we also log structures, we need to log the 'function' as the
structure-name, also fields would be key=(type)val, instead of key=value
* if we switch to gst_tracer_record_log, we'd need to register 27 formats :/

116
tracer/tracer/analyzer.py Normal file
View file

@ -0,0 +1,116 @@
import re
import sys
from tracer.parser import Parser
from tracer.structure import Structure
_SCOPE_RELATED_TO = {
'GST_TRACER_VALUE_SCOPE_PAD': 'Pad',
'GST_TRACER_VALUE_SCOPE_ELEMENT': 'Element',
'GST_TRACER_VALUE_SCOPE_THREAD': 'Thread',
'GST_TRACER_VALUE_SCOPE_PROCESS': 'Process',
}
_NUMERIC_TYPES = ('int', 'uint', 'gint', 'guint', 'gint64', 'guint64')
class Analyzer(object):
'''Base class for a gst tracer analyzer.'''
def __init__(self, log):
self.log = log
self.records = {}
self.data = {}
def handle_tracer_class(self, event):
s = Structure(event[Parser.F_MESSAGE])
# TODO only for debugging
#print("tracer class:", repr(s))
name = s.name[:-len('.class')]
record = {
'class': s,
'scope' : {},
'value' : {},
}
self.records[name] = record
for k,v in s.values.items():
if v.name == 'scope':
# TODO only for debugging
#print("scope: [%s]=%s" % (k, v))
record['scope'][k] = v
elif v.name == 'value':
# skip non numeric and those without min/max
if (v.values['type'] in _NUMERIC_TYPES and
'min' in v.values and 'max' in v.values):
# TODO only for debugging
#print("value: [%s]=%s" % (k, v))
record['value'][k] = v
#else:
# TODO only for debugging
#print("skipping value: [%s]=%s" % (k, v))
def handle_tracer_entry(self, event):
# use first field in message (structure-id) if none
if event[Parser.F_FUNCTION]:
# TODO: parse params in event[Parser.F_MESSAGE]
vmethod_name = event[Parser.F_FUNCTION]
else:
s = Structure(event[Parser.F_MESSAGE])
vmethod_name = s.name
record = self.records.get(vmethod_name)
if record:
# aggregate event based on class
for sk,sv in record['scope'].items():
# look up bin by scope (or create new)
key = (_SCOPE_RELATED_TO[sv.values['related-to']] +
":" + str(s.values[sk]))
scope = self.data.get(key)
if not scope:
scope = {}
self.data[key] = scope
for vk,vv in record['value'].items():
key = vmethod_name + "/" + vk
data = scope.get(key)
if not data:
data = {
'num': 0,
'sum': 0,
}
if 'max' in vv.values and 'min' in vv.values:
data['min'] = int(vv.values['max'])
data['max'] = int(vv.values['min'])
scope[key] = data
# update min/max/sum and count via value
dv = int(s.values[vk])
data['num'] += 1
data['sum'] += dv
if 'min' in data:
data['min'] = min(dv, data['min'])
if 'max' in data:
data['max'] = max(dv, data['max'])
# TODO: check if self has a catch-all handler and call first (check this in init)
# - we can use this to chain filters, allthough the chained filter
# would be doing the same as below
# check if self['vmethod'] is a function, if so call
vmethod = getattr (self, vmethod_name, None)
if callable(vmethod):
vmethod (event)
def is_tracer_class(self, event):
return (event[Parser.F_FILENAME] == 'gsttracerrecord.c' and
event[Parser.F_CATEGORY] == 'GST_TRACER' and
'.class' in event[Parser.F_MESSAGE])
def is_tracer_entry(self, event):
return (not event[Parser.F_LINE] and not event[Parser.F_FILENAME])
def run(self):
for event in self.log:
# check if it is a tracer.class or tracer event
if self.is_tracer_class(event):
self.handle_tracer_class(event)
elif self.is_tracer_entry(event):
self.handle_tracer_entry(event)
#else:
# print("unhandled:", repr(event))

View file

@ -0,0 +1,18 @@
import unittest
from tracer.analyzer import Analyzer
TRACER_CLASS = ('0:00:00.036373170', 1788, '0x23bca70', 'TRACE', 'GST_TRACER', 'gsttracerrecord.c', 110, 'gst_tracer_record_build_format', None, r'latency.class, src=(structure)"scope\\,\\ type\\=\\(type\\)gchararray\\,\\ related-to\\=\\(GstTracerValueScope\\)GST_TRACER_VALUE_SCOPE_PAD\\;", sink=(structure)"scope\\,\\ type\\=\\(type\\)gchararray\\,\\ related-to\\=\\(GstTracerValueScope\\)GST_TRACER_VALUE_SCOPE_PAD\\;", time=(structure)"value\\,\\ type\\=\\(type\\)guint64\\,\\ description\\=\\(string\\)\\"time\\\\\\ it\\\\\\ took\\\\\\ for\\\\\\ the\\\\\\ buffer\\\\\\ to\\\\\\ go\\\\\\ from\\\\\\ src\\\\\\ to\\\\\\ sink\\\\\\ ns\\"\\,\\ flags\\=\\(GstTracerValueFlags\\)GST_TRACER_VALUE_FLAGS_AGGREGATED\\,\\ min\\=\\(guint64\\)0\\,\\ max\\=\\(guint64\\)18446744073709551615\\;";')
TRACER_ENTRY = ('0:00:00.142391137', 1788, '0x7f8a201056d0', 'TRACE', 'GST_TRACER', '', 0, '', None, r'latency, src=(string)source_src, sink=(string)pulsesink0_sink, time=(guint64)47091349;')
class TestAnalyzer(unittest.TestCase):
def test_detect_tracer_class(self):
a = Analyzer(None)
self.assertTrue(a.is_tracer_class(TRACER_CLASS))
def test_detect_tracer_entry(self):
a = Analyzer(None)
self.assertTrue(a.is_tracer_entry(TRACER_ENTRY))

86
tracer/tracer/parser.py Normal file
View file

@ -0,0 +1,86 @@
import os
import re
import sys
# new tracer class
# 0:00:00.041536066 1788 0x14b2150 TRACE GST_TRACER gsttracerrecord.c:110:gst_tracer_record_build_format: latency.class, src=(structure)"scope\,\ type\=\(GType\)NULL\,\ related-to\=\(GstTracerValueScope\)GST_TRACER_VALUE_SCOPE_PAD\;", sink=(structure)"scope\,\ type\=\(GType\)NULL\,\ related-to\=\(GstTracerValueScope\)GST_TRACER_VALUE_SCOPE_PAD\;", time=(structure)"value\,\ type\=\(GType\)NULL\,\ description\=\(string\)\"time\\\ it\\\ took\\\ for\\\ the\\\ buffer\\\ to\\\ go\\\ from\\\ src\\\ to\\\ sink\\\ ns\"\,\ flags\=\(GstTracerValueFlags\)GST_TRACER_VALUE_FLAGS_AGGREGATED\,\ min\=\(guint64\)0\,\ max\=\(guint64\)18446744073709551615\;";
# tracer log entry
# 0:00:00.079422574 7664 0x238ac70 TRACE GST_TRACER :0:: thread-rusage, thread-id=(guint64)37268592, ts=(guint64)79416000, average-cpuload=(uint)1000, current-cpuload=(uint)1000, time=(guint64)79418045;
# from log tracer
# 0:00:00.460486001 18356 0x21de780 TRACE GST_ELEMENT_PADS :0:do_element_add_pad:<GstBaseSink@0x429e880> 0:00:00.460483603, key=val, ...
def _log_line_regex():
# "0:00:00.777913000 "
TIME = r"(\d+:\d\d:\d\d\.\d+)\s+"
# "DEBUG "
LEVEL = "([A-Z]+)\s+"
# "0x8165430 "
THREAD = r"(0x[0-9a-f]+)\s+"
# "GST_REFCOUNTING ", "flacdec "
CATEGORY = "([A-Za-z0-9_-]+)\s+"
# " 3089 "
PID = r"(\d+)\s*"
FILENAME = r"([^:]*):"
LINE = r"(\d+):"
FUNCTION = r"([A-Za-z0-9_]*):"
# FIXME: When non-g(st)object stuff is logged with *_OBJECT (like
# buffers!), the address is printed *without* <> brackets!
OBJECT = "(?:<([^>]+)>)?"
MESSAGE = "(.+)"
ANSI = "(?:\x1b\\[[0-9;]*m\\s*)*\\s*"
return [TIME, ANSI, PID, ANSI, THREAD, ANSI, LEVEL, ANSI, CATEGORY,
FILENAME, LINE, FUNCTION, ANSI, OBJECT, ANSI, MESSAGE]
class Parser(object):
'''Helper to parse a tracer log'''
# record fields
F_TIME = 0
F_PID = 1
F_THREAD = 2
F_LEVEL = 3
F_CATEGORY = 4
F_FILENAME = 5
F_LINE = 6
F_FUNCTION = 7
F_OBJECT = 8
F_MESSAGE = 9
def __init__(self, filename):
self.filename = filename
self.log_regex = re.compile(''.join(_log_line_regex()))
self.file = None
def __enter__(self):
def __is_tracer(line):
return 'TRACE' in line
if self.filename != '-':
self.file = open(self.filename, 'rt')
else:
self.file = sys.stdin
self.data = filter(__is_tracer, self.file)
return self
def __exit__(self, *args):
if self.filename != '-':
self.file.close()
self.file = None
def __iter__(self):
return self
def __next__(self):
while True:
line = next(self.data)
match = self.log_regex.match(line)
if match:
g = list(match.groups())
g[Parser.F_PID] = int(g[Parser.F_PID])
g[Parser.F_LINE] = int(g[Parser.F_LINE])
return g

View file

@ -0,0 +1,46 @@
import sys
import unittest
from tracer.parser import Parser
TESTFILE = './logs/trace.latency.log'
TEXT_DATA = ['first line', 'second line']
TRACER_LOG_DATA = [
'0:00:00.079422574 7664 0x238ac70 TRACE GST_TRACER :0:: thread-rusage, thread-id=(guint64)37268592, ts=(guint64)79416000, average-cpuload=(uint)1000, current-cpuload=(uint)1000, time=(guint64)79418045;'
]
TRACER_CLASS_LOG_DATA = [
'0:00:00.041536066 1788 0x14b2150 TRACE GST_TRACER gsttracerrecord.c:110:gst_tracer_record_build_format: latency.class, src=(structure)"scope\,\ type\=\(type\)gchararray\,\ related-to\=\(GstTracerValueScope\)GST_TRACER_VALUE_SCOPE_PAD\;", sink=(structure)"scope\,\ type\=\(type\)gchararray\,\ related-to\=\(GstTracerValueScope\)GST_TRACER_VALUE_SCOPE_PAD\;", time=(structure)"value\,\ type\=\(type\)guint64\,\ description\=\(string\)\"time\\\ it\\\ took\\\ for\\\ the\\\ buffer\\\ to\\\ go\\\ from\\\ src\\\ to\\\ sink\\\ ns\"\,\ flags\=\(GstTracerValueFlags\)GST_TRACER_VALUE_FLAGS_AGGREGATED\,\ min\=\(guint64\)0\,\ max\=\(guint64\)18446744073709551615\;";'
]
class TestParser(unittest.TestCase):
def test___init__(self):
log = Parser(TESTFILE)
self.assertIsNone(log.file)
def test___enter___with_file(self):
with Parser(TESTFILE) as log:
self.assertIsNotNone(log.file)
def test___enter___with_stdin(self):
sys.stdin = TEXT_DATA
with Parser('-') as log:
self.assertIsNotNone(log.file)
def test_random_text_reports_none(self):
sys.stdin = TEXT_DATA
with Parser('-') as log:
with self.assertRaises(StopIteration):
next(log)
def test_log_file_reports_trace_log(self):
with Parser(TESTFILE) as log:
self.assertIsNotNone(next(log))
def test_trace_log_parsed(self):
sys.stdin = TRACER_LOG_DATA
with Parser('-') as log:
event = next(log)
self.assertEquals(len(event), 10)

View file

@ -0,0 +1,92 @@
import re
class Structure(object):
'''Gst Structure parser.'''
def __init__(self, text):
self.text = text
self.name = None
self.types = {}
self.values = {}
self.pos = 0
self.valid = False
try:
self._parse(self.text)
self.valid = True
except ValueError:
pass
def __repr__(self):
return self.text
def _parse(self, s):
scan = True
# parse id
p = s.find(',')
if p == -1:
p = s.index(';')
scan = False
self.name = s[:p]
s = s[(p + 2):] # skip 'name, '
self.pos += p + 2
# parse fields
while scan:
p = s.index('=')
k = s[:p]
s = s[(p + 1):] # skip 'key='
self.pos += p + 1
p = s.index('(')
s = s[(p + 1):] # skip '('
self.pos += p + 1
p = s.index(')')
t = s[:p]
s = s[(p + 1):] # skip 'type)'
self.pos += p + 1
if t == 'structure':
p = s.index('"')
s = s[(p + 1):] # skip '"'
self.pos += p + 1
# find next '"' without preceeding '\'
sub = s
sublen = 0
while True:
p = sub.index('"')
sublen += p + 1
if sub[p - 1] != '\\':
sub = None
break;
sub = sub[(p + 1):]
if not sub:
sub = s[:(sublen - 1)]
# unescape \., but not \\. (using a backref)
# FIXME: try to combine
# also:
# unescape = re.compile('search')
# unescape.sub('replacement', sub)
sub = re.sub(r'\\\\', r'\\', sub)
sub = re.sub(r'(?<!\\)\\(.)', r'\1', sub)
sub = re.sub(r'(?<!\\)\\(.)', r'\1', sub)
# recurse
v = Structure(sub)
if s[sublen] == ';':
scan = False
s = s[(sublen + 2):]
self.pos += sublen + 2
else:
raise ValueError
else:
p = s.find(',')
if p == -1:
p = s.index(';')
scan = False
v= s[:p]
s = s[(p + 2):] # skip "value, "
self.pos += p + 2
if t == 'string' and v[0] == '"':
v = v[1:-1]
elif t in ['int', 'uint', 'int8', 'uint8', 'int16', 'uint16', 'int32', 'uint32', 'int64', 'uint64' ]:
v = int(v)
self.types[k] = t
self.values[k] = v
self.valid = True

View file

@ -0,0 +1,81 @@
import unittest
from tracer.structure import Structure
BAD_NAME = r'foo bar'
BAD_KEY = r'foo, bar'
BAD_TYPE1 = r'foo, bar=['
BAD_TYPE2 = r'foo, bar=(int'
EMPTY_STRUCTURE = r'foo;'
SINGLE_VALUE_STRUCTURE = r'foo, key=(string)"value";'
MISC_TYPES_STRUCTURE = r'foo, key1=(string)"value", key2=(int)5, key3=(boolean)true;'
NESTED_STRUCTURE = r'foo, nested=(structure)"bar\,\ key1\=\(int\)0\,\ key2\=\(int\)5\;";'
class TestStructure(unittest.TestCase):
def test_handles_bad_name(self):
structure = Structure(BAD_NAME)
self.assertFalse(structure.valid)
self.assertEquals(structure.pos, 0)
def test_handles_bad_key(self):
structure = Structure(BAD_KEY)
self.assertFalse(structure.valid)
self.assertEquals(structure.pos, 5)
def test_handles_bad_type1(self):
structure = Structure(BAD_TYPE1)
self.assertFalse(structure.valid)
self.assertEquals(structure.pos, 9)
def test_handles_bad_type2(self):
structure = Structure(BAD_TYPE2)
self.assertFalse(structure.valid)
self.assertEquals(structure.pos, 10)
def test_parses_empty_structure(self):
structure = Structure(EMPTY_STRUCTURE)
self.assertTrue(structure.valid)
def test_parses_name_in_empty_structure(self):
structure = Structure(EMPTY_STRUCTURE)
self.assertEquals(structure.name, 'foo')
def test_parses_single_value_structure(self):
structure = Structure(SINGLE_VALUE_STRUCTURE)
self.assertTrue(structure.valid)
def test_parses_name(self):
structure = Structure(SINGLE_VALUE_STRUCTURE)
self.assertEquals(structure.name, 'foo')
def test_parses_key(self):
structure = Structure(SINGLE_VALUE_STRUCTURE)
self.assertIn('key', structure.types)
self.assertIn('key', structure.values)
def test_parses_type(self):
structure = Structure(SINGLE_VALUE_STRUCTURE)
self.assertEquals(structure.types['key'], 'string')
def test_parses_string_value(self):
structure = Structure(MISC_TYPES_STRUCTURE)
self.assertEquals(structure.values['key1'], 'value')
def test_parses_int_value(self):
structure = Structure(MISC_TYPES_STRUCTURE)
self.assertEquals(structure.values['key2'], 5)
def test_parses_nested_structure(self):
structure = Structure(NESTED_STRUCTURE)
self.assertTrue(structure.valid)
sub = structure.values['nested']
self.assertTrue(sub.valid)
def test_nested_structure_has_sub_structure(self):
structure = Structure(NESTED_STRUCTURE)
self.assertEquals(structure.types['nested'], 'structure')
self.assertIsInstance(structure.values['nested'], Structure)