gstreamer/subprojects/gst-devtools/validate/launcher/reporters.py

251 lines
8.2 KiB
Python
Raw Normal View History

#!/usr/bin/env python3
2013-12-31 10:45:07 +00:00
#
# Copyright (c) 2013,Thibault Saunier <thibault.saunier@collabora.com>
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this program; if not, write to the
# Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
# Boston, MA 02110-1301, USA.
""" Test Reporters implementation. """
import os
import re
import time
2013-12-31 10:45:07 +00:00
import codecs
import datetime
import tempfile
from .loggable import Loggable
2013-12-31 10:45:07 +00:00
from xml.sax import saxutils
from .utils import Result, printc, Colors
2013-12-31 10:45:07 +00:00
UNICODE_STRINGS = (type(str()) == type(str())) # noqa
2013-12-31 10:45:07 +00:00
class UnknownResult(Exception):
pass
CONTROL_CHARACTERS = re.compile(r"[\000-\010\013\014\016-\037]")
def xml_safe(value):
"""Replaces invalid XML characters with '?'."""
return CONTROL_CHARACTERS.sub('?', value)
def escape_cdata(cdata):
"""Escape a string for an XML CDATA section."""
return xml_safe(cdata).replace(']]>', ']]>]]&gt;<![CDATA[')
class Reporter(Loggable):
2013-12-31 10:45:07 +00:00
name = 'simple'
def __init__(self, options):
Loggable.__init__(self)
2013-12-31 10:45:07 +00:00
self.options = options
self._start_time = 0
2013-12-31 10:45:07 +00:00
self.stats = {'timeout': 0,
'failures': 0,
'passed': 0,
'skipped': 0,
'known_error': 0
2013-12-31 10:45:07 +00:00
}
self.results = []
def init_timer(self):
"""Initialize a timer before starting tests."""
self._start_time = time.time()
2013-12-31 10:45:07 +00:00
def set_failed(self, test):
if test.result == Result.SKIPPED:
self.stats["skipped"] += 1
else:
self.stats["failures"] += 1
2013-12-31 10:45:07 +00:00
def set_passed(self, test):
if test.result == Result.KNOWN_ERROR:
self.stats["known_error"] += 1
else:
self.stats["passed"] += 1
2013-12-31 10:45:07 +00:00
def add_results(self, test):
self.debug("%s", test)
if test.result == Result.PASSED or \
test.result == Result.KNOWN_ERROR:
2013-12-31 10:45:07 +00:00
self.set_passed(test)
elif test.result == Result.FAILED or \
test.result == Result.TIMEOUT or \
test.result == Result.SKIPPED:
2013-12-31 10:45:07 +00:00
self.set_failed(test)
else:
raise UnknownResult("%s" % test.result)
def after_test(self, test):
if test not in self.results:
self.results.append(test)
self.add_results(test)
2013-12-31 10:45:07 +00:00
def final_report(self):
print("\n")
lenstat = (len("Statistics") + 1)
printc("Statistics:\n%s" % (lenstat * "-"), Colors.OKBLUE)
if self._start_time > 0:
printc("\n%sTotal time spent: %s seconds\n" %
((lenstat * " "), datetime.timedelta(
seconds=(time.time() - self._start_time))),
Colors.OKBLUE)
printc("%sPassed: %d" %
(lenstat * " ", self.stats["passed"]), Colors.OKGREEN)
printc("%sSkipped: %d" %
(lenstat * " ", self.stats["skipped"]), Colors.WARNING)
printc("%sFailed: %d" %
(lenstat * " ", self.stats["failures"]), Colors.FAIL)
printc("%sKnown error: %d" %
(lenstat * " ", self.stats["known_error"]), Colors.OKBLUE)
printc("%s%s" %
(lenstat * " ", (len("Failed: 0")) * "-"), Colors.OKBLUE)
total = self.stats["failures"] + self.stats["passed"]
color = Colors.WARNING
if total == self.stats["passed"]:
color = Colors.OKGREEN
elif total == self.stats["failures"]:
color = Colors.FAIL
printc("%sTotal: %d" % (lenstat * " ", total), color)
return self.stats["failures"]
2013-12-31 10:45:07 +00:00
class XunitReporter(Reporter):
2013-12-31 10:45:07 +00:00
"""This reporter provides test results in the standard XUnit XML format."""
name = 'xunit'
encoding = 'UTF-8'
def __init__(self, options):
super(XunitReporter, self).__init__(options)
self._createTmpFile()
2013-12-31 10:45:07 +00:00
def final_report(self):
self.report()
return super(XunitReporter, self).final_report()
2013-12-31 10:45:07 +00:00
def _get_all_logs_data(self, test):
if self.options.redirect_logs:
return ""
captured = ""
value = test.get_log_content()
if value:
captured += escape_cdata(value)
for extralog in test.extra_logfiles:
captured += "\n\n===== %s =====\n\n" % escape_cdata(
os.path.basename(extralog))
value = test.get_extra_log_content(extralog)
captured += escape_cdata(value)
return captured
2013-12-31 10:45:07 +00:00
def _get_captured(self, test):
return '<system-out><![CDATA[%s]]></system-out>' % self._get_all_logs_data(test)
2013-12-31 10:45:07 +00:00
def _quoteattr(self, attr):
"""Escape an XML attribute. Value can be unicode."""
attr = xml_safe(attr)
if isinstance(attr, str) and not UNICODE_STRINGS:
2013-12-31 10:45:07 +00:00
attr = attr.encode(self.encoding)
return saxutils.quoteattr(attr)
def report(self):
"""Writes an Xunit-formatted XML file
The file includes a report of test errors and failures.
"""
self.debug("Writing XML file to: %s", self.options.xunit_file)
xml_file = codecs.open(self.options.xunit_file, 'w',
self.encoding, errors = 'replace')
2013-12-31 10:45:07 +00:00
self.stats['encoding'] = self.encoding
self.stats['total'] = (self.stats['timeout'] + self.stats['failures']
+ self.stats['passed'] + self.stats['skipped'])
xml_file.write('<?xml version="1.0" encoding="%(encoding)s"?>'
'<testsuite name="gst-validate-launcher" tests="%(total)d" '
'errors="%(timeout)d" failures="%(failures)d" '
'skipped="%(skipped)d">' % self.stats)
tmp_xml_file = codecs.open(self.tmp_xml_file.name, 'r',
self.encoding, errors = 'replace')
for l in tmp_xml_file:
xml_file.write(l)
xml_file.write('</testsuite>')
xml_file.close()
tmp_xml_file.close()
os.remove(self.tmp_xml_file.name)
self._createTmpFile()
def _createTmpFile(self):
self.tmp_xml_file = tempfile.NamedTemporaryFile(delete=False)
self.tmp_xml_file.close()
2013-12-31 10:45:07 +00:00
def set_failed(self, test):
"""Add failure output to Xunit report.
"""
super().set_failed(test)
xml_file = codecs.open(self.tmp_xml_file.name, 'a',
self.encoding, errors = 'replace')
xml_file.write(self._forceUnicode(
'<testcase name=%(name)s time="%(taken).3f">%(systemout)s'
'<failure type=%(errtype)s message=%(message)s>'
'</failure></testcase>' %
{'name': self._quoteattr(test.get_classname() + '.' + test.get_name()),
2013-12-31 10:45:07 +00:00
'taken': test.time_taken,
'systemout': self._get_captured(test),
2013-12-31 10:45:07 +00:00
'errtype': self._quoteattr(test.result),
'message': self._quoteattr(test.message),
}))
xml_file.close()
2013-12-31 10:45:07 +00:00
def set_passed(self, test):
"""Add success output to Xunit report.
"""
self.stats['passed'] += 1
xml_file = codecs.open(self.tmp_xml_file.name, 'a',
self.encoding, errors = 'replace')
xml_file.write(self._forceUnicode(
'<testcase name=%(name)s '
2013-12-31 10:45:07 +00:00
'time="%(taken).3f">%(systemout)s</testcase>' %
{'name': self._quoteattr(test.get_classname() + '.' + test.get_name()),
2013-12-31 10:45:07 +00:00
'taken': test.time_taken,
'systemout': self._get_captured(test),
}))
xml_file.close()
2013-12-31 10:45:07 +00:00
def _forceUnicode(self, s):
if not UNICODE_STRINGS:
if isinstance(s, str):
s = s.decode(self.encoding, errors = 'replace')
2013-12-31 10:45:07 +00:00
return s