gstreamer/validate/launcher/reporters.py

239 lines
8 KiB
Python

#!/usr/bin/env python2
#
# Copyright (c) 2013,Thibault Saunier <thibault.saunier@collabora.com>
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this program; if not, write to the
# Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
# Boston, MA 02110-1301, USA.
""" Test Reporters implementation. """
import os
import re
import sys
import time
import codecs
import datetime
from loggable import Loggable
from xml.sax import saxutils
from utils import mkdir, Result, printc, Colors
UNICODE_STRINGS = (type(unicode()) == type(str()))
class UnknownResult(Exception):
pass
CONTROL_CHARACTERS = re.compile(r"[\000-\010\013\014\016-\037]")
def xml_safe(value):
"""Replaces invalid XML characters with '?'."""
return CONTROL_CHARACTERS.sub('?', value)
def escape_cdata(cdata):
"""Escape a string for an XML CDATA section."""
return xml_safe(cdata).replace(']]>', ']]>]]&gt;<![CDATA[')
class Reporter(Loggable):
name = 'simple'
def __init__(self, options):
Loggable.__init__(self)
self._current_test = None
self.out = None
self.options = options
self._start_time = 0
self.stats = {'timeout': 0,
'failures': 0,
'passed': 0,
'skipped': 0
}
self.results = []
def uses_standard_output(self):
return self.out in [sys.stdout, sys.stderr]
def before_test(self, test):
"""Initialize a timer before starting a test."""
if self.options.logsdir == 'stdout':
self.out = sys.stdout
test.logfile = 'stdout'
elif self.options.logsdir == 'stderr':
self.out = sys.stderr
test.logfile = 'stderr'
else:
path = os.path.join(self.options.logsdir,
test.classname.replace(".", os.sep))
mkdir(os.path.dirname(path))
self.out = open(path, 'w+')
test.logfile = path
self._current_test = test
if self._start_time == 0:
self._start_time = time.time()
def set_failed(self, test):
self.stats["failure"] += 1
def set_passed(self, test):
self.stats["passed"] += 1
def add_results(self, test):
self.debug("%s", test)
if test.result == Result.PASSED:
self.set_passed(test)
elif test.result == Result.FAILED or \
test.result == Result.TIMEOUT:
self.set_failed(test)
else:
raise UnknownResult("%s" % test.result)
def after_test(self):
if self._current_test not in self.results:
self.results.append(self._current_test)
self.add_results(self._current_test)
if not self.uses_standard_output():
self.out.close()
self.out = None
self._current_test = None
def final_report(self):
print "\n"
printc("Final Report:", title=True)
for test in sorted(self.results, key=lambda test: test.result):
printc(test)
if test.result != Result.PASSED:
print "\n"
print "\n"
lenstat = (len("Statistics") + 1)
printc("Statistics:\n%s" %(lenstat * "-"), Colors.OKBLUE)
printc("\n%sTotal time spent: %s seconds\n" %
((lenstat * " "), datetime.timedelta(seconds=(time.time() - self._start_time))),
Colors.OKBLUE)
printc("%sPassed: %d" % (lenstat * " ", self.stats["passed"]), Colors.OKGREEN)
printc("%sFailed: %d" % (lenstat * " ", self.stats["failures"]), Colors.FAIL)
printc("%s%s" %(lenstat * " ", (len("Failed: 0")) * "-"), Colors.OKBLUE)
total = self.stats["failures"] + self.stats["passed"]
color = Colors.WARNING
if total == self.stats["passed"]:
color = Colors.OKGREEN
elif total == self.stats["failures"]:
color = Colors.FAIL
printc("%sTotal: %d" % (lenstat * " ", total), color)
class XunitReporter(Reporter):
"""This reporter provides test results in the standard XUnit XML format."""
name = 'xunit'
encoding = 'UTF-8'
xml_file = None
def __init__(self, options):
super(XunitReporter, self).__init__(options)
self.errorlist = []
def final_report(self):
self.report()
super(XunitReporter, self).final_report()
def _get_captured(self):
captured = ""
if self.out and not self.uses_standard_output():
self.out.seek(0)
value = self.out.read()
if value:
captured += '<system-out><![CDATA[%s' % \
escape_cdata(value)
for extralog in self._current_test.extra_logfiles:
captured += "\n\n===== %s =====\n\n" % escape_cdata(os.path.basename(extralog))
value = self._current_test.get_extra_log_content(extralog)
captured += escape_cdata(value)
captured += "]]></system-out>"
return captured
def _quoteattr(self, attr):
"""Escape an XML attribute. Value can be unicode."""
attr = xml_safe(attr)
if isinstance(attr, unicode) and not UNICODE_STRINGS:
attr = attr.encode(self.encoding)
return saxutils.quoteattr(attr)
def report(self):
"""Writes an Xunit-formatted XML file
The file includes a report of test errors and failures.
"""
self.debug("Writing XML file to: %s", self.options.xunit_file)
self.xml_file = codecs.open(self.options.xunit_file, 'w',
self.encoding, 'replace')
self.stats['encoding'] = self.encoding
self.stats['total'] = (self.stats['timeout'] + self.stats['failures']
+ self.stats['passed'] + self.stats['skipped'])
self.xml_file.write( u'<?xml version="1.0" encoding="%(encoding)s"?>'
u'<testsuite name="gst-validate-launcher" tests="%(total)d" '
u'errors="%(timeout)d" failures="%(failures)d" '
u'skip="%(skipped)d">' % self.stats)
self.xml_file.write(u''.join([self._forceUnicode(e)
for e in self.errorlist]))
self.xml_file.write(u'</testsuite>')
self.xml_file.close()
def set_failed(self, test):
"""Add failure output to Xunit report.
"""
self.stats['failures'] += 1
self.errorlist.append(
'<testcase classname=%(cls)s name=%(name)s time="%(taken).3f">'
'<failure type=%(errtype)s message=%(message)s>'
'</failure>%(systemout)s</testcase>' %
{'cls': self._quoteattr(test.get_classname()),
'name': self._quoteattr(test.get_name()),
'taken': test.time_taken,
'errtype': self._quoteattr(test.result),
'message': self._quoteattr(test.message),
'systemout': self._get_captured(),
})
def set_passed(self, test):
"""Add success output to Xunit report.
"""
self.stats['passed'] += 1
self.errorlist.append(
'<testcase classname=%(cls)s name=%(name)s '
'time="%(taken).3f">%(systemout)s</testcase>' %
{'cls': self._quoteattr(test.get_classname()),
'name': self._quoteattr(test.get_name()),
'taken': test.time_taken,
'systemout': self._get_captured(),
})
def _forceUnicode(self, s):
if not UNICODE_STRINGS:
if isinstance(s, str):
s = s.decode(self.encoding, 'replace')
return s