gstreamer/validate/tools/launcher/baseclasses.py

782 lines
24 KiB
Python
Raw Normal View History

2013-12-31 10:45:07 +00:00
#!/usr/bin/python
#
# Copyright (c) 2013,Thibault Saunier <thibault.saunier@collabora.com>
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this program; if not, write to the
# Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
# Boston, MA 02110-1301, USA.
""" Class representing tests and test managers. """
import os
import sys
2013-12-31 10:45:07 +00:00
import re
import time
import utils
import signal
import urlparse
2013-12-31 10:45:07 +00:00
import subprocess
import reporters
import ConfigParser
from loggable import Loggable
from optparse import OptionGroup
2013-12-31 10:45:07 +00:00
from utils import mkdir, Result, Colors, printc, DEFAULT_TIMEOUT, GST_SECOND
2013-12-31 10:45:07 +00:00
class Test(Loggable):
2013-12-31 10:45:07 +00:00
""" A class representing a particular test. """
def __init__(self, application_name, classname, options,
reporter, timeout=DEFAULT_TIMEOUT, hard_timeout=None):
"""
@timeout: The timeout during which the value return by get_current_value
keeps being exactly equal
@hard_timeout: Max time the test can take in absolute
"""
Loggable.__init__(self)
self.timeout = timeout
self.hard_timeout = hard_timeout
2013-12-31 10:45:07 +00:00
self.classname = classname
self.options = options
self.application = application_name
self.command = ""
self.reporter = reporter
self.process = None
self.clean()
def clean(self):
self.message = ""
self.error_str = ""
self.time_taken = 0.0
2013-12-31 10:45:07 +00:00
self._starting_time = None
self.result = Result.NOT_RUN
self.logfile = None
self.extra_logfiles = []
2013-12-31 10:45:07 +00:00
def __str__(self):
string = self.classname
if self.result != Result.NOT_RUN:
2013-12-31 10:45:07 +00:00
string += ": " + self.result
if self.result in [Result.FAILED, Result.TIMEOUT]:
string += " '%s'\n" \
" You can reproduce with: %s\n" \
" You can find logs in:\n" \
" - %s" % (self.message, self.command,
self.logfile)
for log in self.extra_logfiles:
string += "\n - %s" % log
2013-12-31 10:45:07 +00:00
return string
def get_extra_log_content(self, extralog):
if extralog not in self.extra_logfiles:
return ""
f = open(extralog, 'r+')
value = f.read()
f.close()
return value
def get_classname(self):
name = self.classname.split('.')[-1]
classname = self.classname.replace('.%s' % name, '')
return classname
def get_name(self):
return self.classname.split('.')[-1]
2013-12-31 10:45:07 +00:00
def add_arguments(self, *args):
for arg in args:
self.command += " " + arg
def build_arguments(self):
pass
2013-12-31 10:45:07 +00:00
def set_result(self, result, message="", error=""):
self.debug("Setting result: %s (message: %s, error: %s", result,
message, error)
if result is Result.TIMEOUT and self.options.debug is True:
pname = subprocess.check_output(("readlink -e /proc/%s/exe"
% self.process.pid).split(' ')).replace('\n', '')
raw_input("%sTimeout happened you can attach gdb doing: $gdb %s %d%s\n"
"Press enter to continue" %(Colors.FAIL, pname, self.process.pid,
Colors.ENDC))
2013-12-31 10:45:07 +00:00
self.result = result
self.message = message
self.error_str = error
2013-12-31 10:45:07 +00:00
def check_results(self):
if self.result is Result.FAILED:
return
self.debug("%s returncode: %s", self, self.process.returncode)
if self.result == Result.TIMEOUT:
self.set_result(Result.TIMEOUT, "Application timed out", "timeout")
elif self.process.returncode == 0:
self.set_result(Result.PASSED)
else:
self.set_result(Result.FAILED,
"Application returned %d" % (
self.process.returncode))
def get_current_value(self):
"""
Lets subclasses implement a nicer timeout measurement method
They should return some value with which we will compare
the previous and timeout if they are egual during self.timeout
seconds
"""
return Result.NOT_RUN
2013-12-31 10:45:07 +00:00
def wait_process(self):
last_val = 0
2013-12-31 10:45:07 +00:00
last_change_ts = time.time()
start_ts = time.time()
2013-12-31 10:45:07 +00:00
while True:
self.process.poll()
if self.process.returncode is not None:
break
# Dirty way to avoid eating to much CPU...
# good enough for us anyway.
time.sleep(1)
val = self.get_current_value()
2013-12-31 10:45:07 +00:00
self.debug("Got value: %s" % val)
if val is Result.NOT_RUN:
# The get_current_value logic is not implemented... dumb timeout
if time.time() - last_change_ts > self.timeout:
self.set_result(Result.TIMEOUT)
break
continue
elif val is Result.FAILED:
break
elif val is Result.KNOWN_ERROR:
break
2013-12-31 10:45:07 +00:00
self.log("New val %s" % val)
if val == last_val:
delta = time.time() - last_change_ts
self.debug("%s: Same value for %d/%d seconds" % (self, delta, self.timeout))
if delta > self.timeout:
self.set_result(Result.TIMEOUT)
break
elif self.hard_timeout and time.time() - start_ts > self.hard_timeout:
self.set_result(Result.TIMEOUT, "Hard timeout reached: %d", self.hard_timeout)
break
else:
last_change_ts = time.time()
last_val = val
self.check_results()
2013-12-31 10:45:07 +00:00
def get_subproc_env(self):
return os.environ
2013-12-31 10:45:07 +00:00
def run(self):
self.command = "%s " % (self.application)
self._starting_time = time.time()
self.build_arguments()
proc_env = self.get_subproc_env()
message = "Launching: %s%s\n" \
" Command: '%s'\n" \
" Logs:\n" \
" - %s" % (Colors.ENDC, self.classname,
self.command, self.logfile)
for log in self.extra_logfiles:
message += "\n - %s" % log
printc(message, Colors.OKBLUE)
2013-12-31 10:45:07 +00:00
try:
self.process = subprocess.Popen("exec " + self.command,
2013-12-31 10:45:07 +00:00
stderr=self.reporter.out,
stdout=self.reporter.out,
shell=True,
env=proc_env)
2013-12-31 10:45:07 +00:00
self.wait_process()
except KeyboardInterrupt:
self.process.send_signal(signal.SIGINT)
2013-12-31 10:45:07 +00:00
raise
try:
self.process.send_signal(signal.SIGINT)
2013-12-31 10:45:07 +00:00
except OSError:
pass
self.time_taken = time.time() - self._starting_time
self.reporter.out.seek(0)
self.reporter.out.write("=================\n"
"Test name: %s\n"
"Command: '%s'\n"
"=================\n\n"
% (self.classname, self.command))
printc("Result: %s%s\n" % (self.result,
" (" + self.message + ")" if self.message else ""),
color=utils.get_color_for_result(self.result))
return self.result
2013-12-31 10:45:07 +00:00
class GstValidateTest(Test):
""" A class representing a particular test. """
findpos_regex = re.compile('.*position.*(\d+):(\d+):(\d+).(\d+).*duration.*(\d+):(\d+):(\d+).(\d+)')
findlastseek_regex = re.compile('seeking to.*(\d+):(\d+):(\d+).(\d+).*stop.*(\d+):(\d+):(\d+).(\d+).*rate.*(\d+)\.(\d+)')
def __init__(self, application_name, classname,
options, reporter, timeout=DEFAULT_TIMEOUT,
scenario=None, hard_timeout=None):
super(GstValidateTest, self).__init__(application_name, classname, options,
reporter, timeout=timeout, hard_timeout=hard_timeout)
# defines how much the process can be outside of the configured
# segment / seek
self._sent_eos_pos = None
self.validatelogs = None
if scenario is None or scenario.name.lower() == "none":
self.scenario = None
else:
self.scenario = scenario
def get_subproc_env(self):
subproc_env = os.environ.copy()
self.validatelogs = self.logfile + '.validate.logs'
utils.touch(self.validatelogs)
subproc_env["GST_VALIDATE_FILE"] = self.validatelogs
self.extra_logfiles.append(self.validatelogs)
if 'GST_DEBUG' in os.environ:
gstlogsfile = self.logfile + '.gstdebug'
self.extra_logfiles.append(gstlogsfile)
subproc_env["GST_DEBUG_FILE"] = gstlogsfile
return subproc_env
def clean(self):
Test.clean(self)
self._sent_eos_pos = None
def build_arguments(self):
if self.scenario is not None:
self.add_arguments("--set-scenario",
self.scenario.get_execution_name())
def get_extra_log_content(self, extralog):
value = Test.get_extra_log_content(self, extralog)
if extralog == self.validatelogs:
value = re.sub("<position:.*/>\r", "", value)
return value
def get_validate_criticals_errors(self):
ret = "["
errors = []
for l in open(self.validatelogs, 'r').readlines():
if "critical : " in l:
if ret != "[":
ret += ", "
error = l.split("critical : ")[1].replace("\n", '')
if error not in errors:
ret += error
errors.append(error)
if ret == "[":
return "No critical"
else:
return ret + "]"
def check_results(self):
if self.result is Result.FAILED or self.result is Result.PASSED:
return
self.debug("%s returncode: %s", self, self.process.returncode)
if self.result == Result.TIMEOUT:
self.set_result(Result.TIMEOUT, "Application timed out", "timeout")
elif self.process.returncode == 0:
self.set_result(Result.PASSED)
else:
if self.process.returncode == 139:
# FIXME Reimplement something like that if needed
# self.get_backtrace("SEGFAULT")
self.set_result(Result.FAILED,
"Application segfaulted",
"segfault")
else:
self.set_result(Result.FAILED,
"Application returned %s (issues: %s)" % (
self.process.returncode,
self.get_validate_criticals_errors()
))
def _parse_position(self, p):
self.log("Parsing %s" % p)
times = self.findpos_regex.findall(p)
if len(times) != 1:
self.warning("Got a unparsable value: %s" % p)
return 0, 0
return (utils.gsttime_from_tuple(times[0][:4]),
utils.gsttime_from_tuple(times[0][4:]))
def _parse_buffering(self, b):
return b.split("buffering... ")[1].split("%")[0], 100
def _get_position(self):
position = duration = -1
self.debug("Getting position")
m = None
for l in reversed(open(self.validatelogs, 'r').readlines()):
l = l.lower()
if "<position:" in l or "buffering" in l:
m = l
break
if m is None:
self.debug("Could not fine any positionning info")
return position, duration
for j in m.split("\r"):
j = j.lstrip().rstrip()
if j.startswith("<position:") and j.endswith("/>"):
position, duration = self._parse_position(j)
elif j.startswith("buffering") and j.endswith("%"):
position, duration = self._parse_buffering(j)
else:
self.log("No info in %s" % j)
return position, duration
def _get_last_seek_values(self):
m = None
rate = start = stop = None
for l in reversed(open(self.validatelogs, 'r').readlines()):
l = l.lower()
if "seeking to: " in l:
m = l
break
if m is None:
self.debug("Could not fine any seeking info")
return start, stop, rate
values = self.findlastseek_regex.findall(m)
if len(values) != 1:
self.warning("Got a unparsable value: %s" % p)
return start, stop, rate
v = values[0]
return (utils.gsttime_from_tuple(v[:4]),
utils.gsttime_from_tuple(v[4:8]),
float(str(v[8]) + "." + str(v[9])))
def sent_eos_position(self):
if self._sent_eos_pos is not None:
return self._sent_eos_pos
m = None
rate = start = stop = None
for l in reversed(open(self.validatelogs, 'r').readlines()):
l = l.lower()
if "sending eos" in l:
m = l
self._sent_eos_pos = time.time()
return self._sent_eos_pos
return None
def get_current_position(self):
position, duration = self._get_position()
if position == -1:
return position
return position
def get_current_size(self):
position = self.get_current_position()
try:
size = os.stat(urlparse.urlparse(self.dest_file).path).st_size
except OSError as e:
return position
self.debug("Size: %s" % size)
return size
class TestsManager(Loggable):
2013-12-31 10:45:07 +00:00
""" A class responsible for managing tests. """
name = ""
2013-12-31 10:45:07 +00:00
def __init__(self):
Loggable.__init__(self)
self.tests = set([])
self.unwanted_tests = set([])
2013-12-31 10:45:07 +00:00
self.options = None
self.args = None
self.reporter = None
self.wanted_tests_patterns = []
self.blacklisted_tests_patterns = []
2013-12-31 10:45:07 +00:00
def init(self):
return False
2013-12-31 10:45:07 +00:00
def list_tests(self):
return self.tests
2013-12-31 10:45:07 +00:00
def add_test(self, test):
if self._is_test_wanted(test):
self.tests.add(test)
else:
self.unwanted_tests.add(test)
2013-12-31 10:45:07 +00:00
def get_tests(self):
return self.tests
def get_blacklisted(self):
return []
2013-12-31 10:45:07 +00:00
def add_options(self, parser):
""" Add more arguments. """
pass
def set_settings(self, options, args, reporter):
""" Set properties after options parsing. """
self.options = options
self.args = args
self.reporter = reporter
if options.wanted_tests:
for patterns in options.wanted_tests:
for pattern in patterns.split(","):
self.wanted_tests_patterns.append(re.compile(pattern))
2013-12-31 10:45:07 +00:00
if options.blacklisted_tests:
for patterns in options.blacklisted_tests:
for pattern in patterns.split(","):
self.blacklisted_tests_patterns.append(re.compile(pattern))
def _check_blacklisted(self, test):
for pattern in self.blacklisted_tests_patterns:
if pattern.findall(test.classname):
return True
return False
2013-12-31 10:45:07 +00:00
def _is_test_wanted(self, test):
if self._check_blacklisted(test):
return False
if not self.wanted_tests_patterns:
return True
2013-12-31 10:45:07 +00:00
for pattern in self.wanted_tests_patterns:
if pattern.findall(test.classname):
return True
return False
def run_tests(self, cur_test_num, total_num_tests):
i = cur_test_num
2013-12-31 10:45:07 +00:00
for test in self.tests:
sys.stdout.write("[%d / %d] " % (i, total_num_tests))
self.reporter.before_test(test)
res = test.run()
i += 1
self.reporter.after_test()
if res != Result.PASSED and (self.options.forever or
self.options.fatal_error):
return test.result
2013-12-31 10:45:07 +00:00
return Result.PASSED
def clean_tests(self):
for test in self.tests:
test.clean()
def needs_http_server(self):
return False
2013-12-31 10:45:07 +00:00
class _TestsLauncher(Loggable):
2013-12-31 10:45:07 +00:00
def __init__(self):
Loggable.__init__(self)
self.options = None
2013-12-31 10:45:07 +00:00
self.testers = []
self.tests = []
self.reporter = None
self._list_testers()
self.wanted_tests_patterns = []
def _list_testers(self):
def get_subclasses(c, env):
subclasses = []
for symb in env.iteritems():
try:
if issubclass(symb[1], c) and not symb[1] is c:
2013-12-31 10:45:07 +00:00
subclasses.append(symb[1])
except TypeError:
pass
return subclasses
env = globals().copy()
d = os.path.dirname(__file__)
for f in os.listdir(os.path.join(d, "apps")):
if f.endswith(".py"):
execfile(os.path.join(d, "apps", f), env)
testers = [i() for i in get_subclasses(TestsManager, env)]
for tester in testers:
if tester.init() is True:
self.testers.append(tester)
else:
self.warning("Can not init tester: %s -- PATH is %s"
% (tester.name, os.environ["PATH"]))
2013-12-31 10:45:07 +00:00
def add_options(self, parser):
for tester in self.testers:
group = OptionGroup(parser, "%s options" % tester.name,
"Options specific to the %s test manager"
% tester.name)
tester.add_options(group)
if group.option_list:
parser.add_option_group(group)
2013-12-31 10:45:07 +00:00
def set_settings(self, options, args):
self.reporter = reporters.XunitReporter(options)
mkdir(options.logsdir)
self.options = options
wanted_testers = None
for tester in self.testers:
if tester.name in args:
wanted_testers = tester.name
if wanted_testers:
testers = self.testers
self.testers = []
for tester in testers:
if tester.name in args:
self.testers.append(tester)
args.remove(tester.name)
2013-12-31 10:45:07 +00:00
for tester in self.testers:
tester.set_settings(options, args, self.reporter)
def list_tests(self):
for tester in self.testers:
tester.list_tests()
self.tests.extend(tester.tests)
return self.tests
2013-12-31 10:45:07 +00:00
def _run_tests(self):
cur_test_num = 0
total_num_tests = 0
for tester in self.testers:
total_num_tests += len(tester.list_tests())
2013-12-31 10:45:07 +00:00
for tester in self.testers:
res = tester.run_tests(cur_test_num, total_num_tests)
cur_test_num += len(tester.list_tests())
if res != Result.PASSED and (self.options.forever or
self.options.fatal_error):
return False
return True
def _clean_tests(self):
for tester in self.testers:
tester.clean_tests()
def run_tests(self):
if self.options.forever:
while self._run_tests():
self._clean_tests()
return False
else:
return self._run_tests()
2013-12-31 10:45:07 +00:00
def final_report(self):
self.reporter.final_report()
def needs_http_server(self):
for tester in self.testers:
if tester.needs_http_server():
return True
def get_blacklisted(self):
res = []
for tester in self.testers:
for blacklisted in tester.get_blacklisted():
if isinstance(blacklisted, str):
res.append(blacklisted, "Unknown")
else:
res.append(blacklisted)
return res
class NamedDic(object):
def __init__(self, props):
if props:
for name, value in props.iteritems():
setattr(self, name, value)
class Scenario(object):
def __init__(self, name, props, path=None):
self.name = name
self.path = path
for prop, value in props:
setattr(self, prop.replace("-", "_"), value)
def get_execution_name(self):
if self.path is not None:
return self.path
else:
return self.name
def seeks(self):
if hasattr(self, "seek"):
return bool(self.seek)
return False
def does_reverse_playback(self):
if hasattr(self, "reverse_playback"):
return bool(self.seek)
return False
def get_min_tracks(self, track_type):
try:
return int(getattr(self, "min_%s_track" % track_type))
except AttributeError:
return 0
class ScenarioManager(Loggable):
_instance = None
all_scenarios = []
FILE_EXTENDION = "scenario"
GST_VALIDATE_COMMAND = "gst-validate-1.0"
if "win32" in sys.platform:
GST_VALIDATE_COMMAND += ".exe"
def __new__(cls, *args, **kwargs):
if not cls._instance:
cls._instance = super(ScenarioManager, cls).__new__(
cls, *args, **kwargs)
cls._instance.config = None
cls._instance.discovered = False
Loggable.__init__(cls._instance)
return cls._instance
def find_special_scenarios(self, mfile):
scenarios = []
mfile_bname = os.path.basename(mfile)
for f in os.listdir(os.path.dirname(mfile)):
if re.findall("%s\..*\.%s$" % (mfile_bname, self.FILE_EXTENDION),
f):
scenarios.append(os.path.join(os.path.dirname(mfile), f))
if scenarios:
scenarios = self.discover_scenarios(scenarios, mfile)
return scenarios
def discover_scenarios(self, scenario_paths=[], mfile=None):
"""
Discover scenarios specified in scenario_paths or the default ones
if nothing specified there
"""
scenarios = []
scenario_defs = os.path.join(self.config.main_dir, "scenarios.def")
try:
command = [self.GST_VALIDATE_COMMAND, "--scenarios-defs-output-file", scenario_defs]
command.extend(scenario_paths)
subprocess.check_output(command)
except subprocess.CalledProcessError:
pass
config = ConfigParser.ConfigParser()
f = open(scenario_defs)
config.readfp(f)
for section in config.sections():
if scenario_paths:
for scenario_path in scenario_paths:
if section in scenario_path:
# The real name of the scenario is:
# filename.REALNAME.scenario
name = scenario_path.replace(mfile + ".", "").replace("." + self.FILE_EXTENDION, "")
path = scenario_path
else:
name = section
path = None
scenarios.append(Scenario(name, config.items(section), path))
if not scenario_paths:
self.discovered = True
self.all_scenarios.extend(scenarios)
return scenarios
def get_scenario(self, name):
if self.discovered is False:
self.discover_scenarios()
if name is None:
return self.all_scenarios
try:
return [scenario for scenario in self.all_scenarios if scenario.name == name][0]
except IndexError:
self.warning("Scenario: %s not found" % name)
return None