From b4a2ca62860b5b85ddd345a9f5f04ca266769beb Mon Sep 17 00:00:00 2001 From: Thibault Saunier Date: Tue, 31 Dec 2013 11:45:07 +0100 Subject: [PATCH] Add a test launcher tool --- validate/tools/apps/ges-projects-tests.py | 353 ++++++++++++++++++++++ validate/tools/gst-validate-launcher.py | 51 ++++ validate/tools/reporters.py | 183 +++++++++++ validate/tools/testdefinitions.py | 227 ++++++++++++++ validate/tools/utils.py | 63 ++++ 5 files changed, 877 insertions(+) create mode 100644 validate/tools/apps/ges-projects-tests.py create mode 100644 validate/tools/gst-validate-launcher.py create mode 100644 validate/tools/reporters.py create mode 100644 validate/tools/testdefinitions.py create mode 100644 validate/tools/utils.py diff --git a/validate/tools/apps/ges-projects-tests.py b/validate/tools/apps/ges-projects-tests.py new file mode 100644 index 0000000000..54fded71a2 --- /dev/null +++ b/validate/tools/apps/ges-projects-tests.py @@ -0,0 +1,353 @@ +#!/usr//bin/python +# +# Copyright (c) 2013,Thibault Saunier +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this program; if not, write to the +# Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, +# Boston, MA 02110-1301, USA. + +import os +import time +from urllib import unquote +from urlparse import urlsplit +from utils import launch_command +from gi.repository import GES, Gst, GLib + +DURATION_TOLERANCE = Gst.SECOND / 2 +DEFAULT_GES_LAUNCH = "ges-launch-1.0" + + +class Combination(object): + + def __str__(self): + return "%s and %s in %s" % (self.acodec, self.vcodec, self.container) + + def __init__(self, container, acodec, vcodec): + self.container = container + self.acodec = acodec + self.vcodec = vcodec + + +FORMATS = {"aac": "audio/mpeg,mpegversion=4", + "ac3": "audio/x-ac3", + "vorbis": "audio/x-vorbis", + "mp3": "audio/mpeg,mpegversion=1,layer=3", + "h264": "video/x-h264", + "vp8": "video/x-vp8", + "theora": "video/x-theora", + "ogg": "application/ogg", + "mkv": "video/x-matroska", + "mp4": "video/quicktime,variant=iso;", + "webm": "video/x-matroska"} + +COMBINATIONS = [ + Combination("ogg", "vorbis", "theora"), + Combination("webm", "vorbis", "vp8"), + Combination("mp4", "mp3", "h264"), + Combination("mkv", "vorbis", "h264")] + + +SCENARIOS = ["none", "seek_forward", "seek_backward", "scrub_forward_seeking"] + + +def get_profile_full(muxer, venc, aenc, video_restriction=None, + audio_restriction=None, + audio_presence=0, video_presence=0): + ret = "\"" + if muxer: + ret += muxer + ret += ":" + if venc: + if video_restriction is not None: + ret = ret + video_restriction + '->' + ret += venc + if video_presence: + ret = ret + '|' + str(video_presence) + if aenc: + ret += ":" + if audio_restriction is not None: + ret = ret + audio_restriction + '->' + ret += aenc + if audio_presence: + ret = ret + '|' + str(audio_presence) + + ret += "\"" + return ret.replace("::", ":") + + + +def get_profile(combination): + return get_profile_full(FORMATS[combination.container], + FORMATS[combination.vcodec], + FORMATS[combination.acodec], + video_restriction="video/x-raw,format=I420") + + +def quote_uri(uri): + """ + Encode a URI/path according to RFC 2396, without touching the file:/// part. + """ + # Split off the "file:///" part, if present. + parts = urlsplit(uri, allow_fragments=False) + # Make absolutely sure the string is unquoted before quoting again! + raw_path = unquote(parts.path) + # For computing thumbnail md5 hashes in the media library, we must adhere to + # RFC 2396. It is quite tricky to handle all corner cases, leave it to Gst: + return Gst.filename_to_uri(raw_path) + + +class GESTest(Test): + def __init__(self, classname, options, reporter, project_uri, scenario, + combination=None): + super(GESTest, self).__init__(DEFAULT_GES_LAUNCH, classname, options, reporter) + self.scenario = scenario + self.project_uri = project_uri + self.combination = combination + proj = GES.Project.new(project_uri) + tl = proj.extract() + if tl is None: + self.duration = None + else: + self.duration = tl.get_meta("duration") + if self.duration is not None: + self.duration = self.duration / Gst.SECOND + else: + self.duration = 2 * 60 + + def set_rendering_info(self): + self.dest_file = os.path.join(self.options.dest, + os.path.basename(self.project_uri) + + '-' + self.combination.acodec + + self.combination.vcodec + '.' + + self.combination.container) + if not Gst.uri_is_valid(self.dest_file): + self.dest_file = GLib.filename_to_uri(self.dest_file, None) + + profile = get_profile(self.combination) + self.add_arguments("-f", profile, "-o", self.dest_file) + + def set_sample_paths(self): + if not self.options.paths: + if not self.options.recurse_paths: + return + paths = [os.path.dirname(Gst.uri_get_location(self.project_uri))] + else: + paths = self.options.paths + + for path in paths: + if self.options.recurse_paths: + self.add_arguments("--sample-paths", quote_uri(path)) + for root, dirs, files in os.walk(path): + for directory in dirs: + self.add_arguments("--sample-paths", + quote_uri(os.path.join(path, + root, + directory) + ) + ) + else: + self.add_arguments("--sample-paths", "file://" + path) + + def build_arguments(self): + print "\OOO %s" % self.combination + if self.scenario is not None: + self.add_arguments("--set-scenario", self.scenario) + if self.combination is not None: + self.set_rendering_info() + + if self.options.mute: + self.add_arguments(" --mute") + + self.set_sample_paths() + self.add_arguments("-l", self.project_uri) + + def check_results(self): + if self.process.returncode == 0: + if self.combination: + try: + asset = GES.UriClipAsset.request_sync(self.dest_file) + if self.duration - DURATION_TOLERANCE <= asset.get_duration() \ + <= self.duration + DURATION_TOLERANCE: + self.set_result(Result.FAILURE, "Duration of encoded file is " + " wrong (%s instead of %s)" % + (Gst.TIME_ARGS(self.duration), + Gst.TIME_ARGS(asset.get_duration())), + "wrong-duration") + else: + self.set_result(Result.PASSED) + except GLib.Error as e: + self.set_result(Result.FAILURE, "Wrong rendered file", "failure", e) + + else: + self.set_result(Result.PASSED) + else: + if self.combination and self.result == Result.TIMEOUT: + missing_eos = False + try: + asset = GES.UriClipAsset.request_sync(self.dest_file) + if asset.get_duration() == self.duration: + missing_eos = True + except Exception as e: + pass + + if missing_eos is True: + self.set_result(Result.TIMEOUT, "The rendered file add right duration, MISSING EOS?\n", + "failure", e) + else: + if self.result == Result.TIMEOUT: + self.set_result(Result.TIMEOUT, "Application timed out", "timeout") + else: + if self.process.returncode == 139: + self.get_backtrace("SEGFAULT") + self.set_result("Application segfaulted") + else: + self.set_result(Result.FAILED, + "Application returned %d (issues: %s)" % ( + self.process.returncode, + self.get_validate_criticals_errors()), + "error") + + + def wait_process(self): + last_val = 0 + last_change_ts = time.time() + while True: + self.process.poll() + if self.process.returncode is not None: + self.check_results() + break + + # Dirty way to avoid eating to much CPU... good enough for us anyway. + time.sleep(1) + + if self.combination: + val = os.stat(GLib.filename_from_uri(self.dest_file)[0]).st_size + else: + val = self.get_last_position() + + if val == last_val: + if time.time() - last_change_ts > 10: + self.result = Result.TIMEOUT + else: + last_change_ts = time.time() + last_val = val + + + def get_last_position(self): + self.reporter.out.seek(0) + m = None + for l in self.reporter.out.readlines(): + if ""): + pos = j + + return pos + + +class GESTestsManager(TestsManager): + def __init__(self): + super(GESTestsManager, self).__init__() + Gst.init(None) + GES.init() + + default_opath = GLib.get_user_special_dir( + GLib.UserDirectory.DIRECTORY_VIDEOS) + if default_opath: + self.default_path = os.path.join(default_opath, "ges-projects") + else: + self.default_path = os.path.join(os.path.expanduser('~'), "Video", + "ges-projects") + + def add_options(self, parser): + parser.add_option("-o", "--output-path", dest="dest", + default=os.path.join(self.default_path, "rendered"), + help="Set the path to which projects should be" + " renderd") + parser.add_option("-P", "--sample-path", dest="paths", + default=[], + help="Paths in which to look for moved assets") + parser.add_option("-r", "--recurse-paths", dest="recurse_paths", + default=False, action="store_true", + help="Whether to recurse into paths to find assets") + parser.add_option("-m", "--mute", dest="mute", + action="store_true", default=False, + help="Mute playback output, which mean that we use " + "a fakesink") + + + def set_settings(self, options, args, reporter): + TestsManager.set_settings(self, options, args, reporter) + if not args and not os.path.exists(self.default_path): + launch_command("git clone %s" % DEFAULT_ASSET_REPO, + "Getting assets") + + if not Gst.uri_is_valid(options.dest): + options.dest = GLib.filename_to_uri(options.dest, None) + + try: + os.makedirs(GLib.filename_from_uri(options.dest)[0]) + print "Created directory: %s" % options.dest + except OSError: + pass + + def list_tests(self): + projects = list() + if not self.args: + self.options.paths = [os.path.join(self.default_path, "assets")] + path = os.path.join(self.default_path, "projects") + for root, dirs, files in os.walk(path): + for f in files: + if not f.endswith(".xges"): + continue + + projects.append(GLib.filename_to_uri(os.path.join(path, + root, + f), + None)) + else: + for proj in self.args: + if Gst.uri_is_valid(proj): + projects.append(proj) + else: + projects.append(GLib.filename_to_uri(proj, None)) + + for proj in projects: + # First playback casses + for scenario in SCENARIOS: + classname = "ges.playback.%s.%s" % (scenario, os.path.basename(proj).replace(".xges", "")) + self.tests.append(GESTest(classname, + self.options, + self.reporter, + proj, + scenario) + ) + + # And now rendering casses + for comb in COMBINATIONS: + classname = "ges.render.%s.%s" % (str(comb).replace(' ', '_'), + os.path.basename(proj).replace(".xges", "")) + self.tests.append(GESTest(classname, + self.options, + self.reporter, + proj, + None, + comb) + ) diff --git a/validate/tools/gst-validate-launcher.py b/validate/tools/gst-validate-launcher.py new file mode 100644 index 0000000000..23cf52412a --- /dev/null +++ b/validate/tools/gst-validate-launcher.py @@ -0,0 +1,51 @@ +#!/usr//bin/python +import os +from testdefinitions import _TestsLauncher +from optparse import OptionParser + +def main(): + parser = OptionParser() + parser.add_option("-g", "--gdb", dest="gdb", + action="store_true", + default=False, + help="Run applications into gdb") + parser.add_option("-f", "--forever", dest="forever", + action="store_true", default=False, + help="Keep running tests until one fails") + parser.add_option("-F", "--fatal-error", dest="fatal_error", + action="store_true", default=False, + help="Stop on first fail") + parser.add_option('--xunit-file', action='store', + dest='xunit_file', metavar="FILE", + default=None, + help=("Path to xml file to store the xunit report in. " + "Default is xunit.xml the logs-dir directory")) + parser.add_option("-t", "--wanted-tests", dest="wanted_tests", + default=None, + help="Define the tests to execute, it can be a regex") + parser.add_option("-L", "--list-tests", + dest="list_tests", + action="store_true", + default=False, + help="List tests and exit") + parser.add_option("-l", "--logs-dir", dest="logsdir", + action="store_true", default=os.path.expanduser("~/gst-validate/logs/"), + help="Directory where to store logs") + + tests_launcher = _TestsLauncher() + tests_launcher.add_options(parser) + (options, args) = parser.parse_args() + if options.xunit_file is None: + options.xunit_file = os.path.join(options.logsdir, "xunit.xml") + tests_launcher.set_settings(options, args) + tests_launcher.list_tests() + if options.list_tests: + for test in tests_launcher.tests: + print test + return 0 + tests_launcher.run_tests() + tests_launcher.final_report() + return 0 + +if "__main__" == __name__: + exit(main()) diff --git a/validate/tools/reporters.py b/validate/tools/reporters.py new file mode 100644 index 0000000000..1917856815 --- /dev/null +++ b/validate/tools/reporters.py @@ -0,0 +1,183 @@ +#!/usr/bin/python +# +# Copyright (c) 2013,Thibault Saunier +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this program; if not, write to the +# Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, +# Boston, MA 02110-1301, USA. + +""" Test Reporters implementation. """ + +import os +import re +import codecs +import testdefinitions +from xml.sax import saxutils +from utils import mkdir, Result + +UNICODE_STRINGS = (type(unicode()) == type(str())) + + +class UnknownResult(Exception): + pass + + +CONTROL_CHARACTERS = re.compile(r"[\000-\010\013\014\016-\037]") + + +def xml_safe(value): + """Replaces invalid XML characters with '?'.""" + return CONTROL_CHARACTERS.sub('?', value) + + +def escape_cdata(cdata): + """Escape a string for an XML CDATA section.""" + return xml_safe(cdata).replace(']]>', ']]>]]>' % \ + escape_cdata(value) + return '' + + def _quoteattr(self, attr): + """Escape an XML attribute. Value can be unicode.""" + attr = xml_safe(attr) + if isinstance(attr, unicode) and not UNICODE_STRINGS: + attr = attr.encode(self.encoding) + return saxutils.quoteattr(attr) + + def report(self): + """Writes an Xunit-formatted XML file + + The file includes a report of test errors and failures. + + """ + print "Writing XML file to: %s" % self.options.xunit_file + self.xml_file = codecs.open(self.options.xunit_file, 'w', + self.encoding, 'replace') + self.stats['encoding'] = self.encoding + self.stats['total'] = (self.stats['timeout'] + self.stats['failures'] + + self.stats['passes'] + self.stats['skipped']) + print self.stats + self.xml_file.write( u'' + u'' % self.stats) + self.xml_file.write(u''.join([self._forceUnicode(e) + for e in self.errorlist])) + self.xml_file.write(u'') + self.xml_file.close() + + def set_failed(self, test): + """Add failure output to Xunit report. + """ + self.stats['failures'] += 1 + self.results.insert(0, test) + self.errorlist.append( + '' + '' + '%(systemout)s' % + {'cls': self._quoteattr(test.classname), + 'name': self._quoteattr(test.classname.split('.')[-1]), + 'taken': test.time_taken, + 'errtype': self._quoteattr(test.result), + 'message': self._quoteattr(test.message), + 'systemout': self._get_captured(), + }) + + def set_passed(self, test): + """Add success output to Xunit report. + """ + self.stats['passes'] += 1 + self.results.append(test) + self.errorlist.append( + '%(systemout)s' % + {'cls': self._quoteattr(test.classname), + 'name': self._quoteattr(test.classname.split('.')[-1]), + 'taken': test.time_taken, + 'systemout': self._get_captured(), + }) + + def _forceUnicode(self, s): + if not UNICODE_STRINGS: + if isinstance(s, str): + s = s.decode(self.encoding, 'replace') + return s diff --git a/validate/tools/testdefinitions.py b/validate/tools/testdefinitions.py new file mode 100644 index 0000000000..7771624c1e --- /dev/null +++ b/validate/tools/testdefinitions.py @@ -0,0 +1,227 @@ +#!/usr/bin/python +# +# Copyright (c) 2013,Thibault Saunier +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this program; if not, write to the +# Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, +# Boston, MA 02110-1301, USA. + +""" Class representing tests and test managers. """ + +import os +import re +import time +import subprocess +import reporters + +from utils import mkdir, Result + +DEFAULT_TIMEOUT = 10 + + +class Test(object): + + """ A class representing a particular test. """ + + def __init__(self, application_name, classname, options, reporter): + self.timeout = DEFAULT_TIMEOUT + self.classname = classname + self.options = options + self.application = application_name + self.command = "" + self.reporter = reporter + self.process = None + + self.message = None + self.error = None + self.time_taken = None + self._starting_time = None + self.result = Result.NOT_RUN + + def __str__(self): + string = self.classname + if self.result: + string += ": " + self.result + if "FAILED" in self.result: + string += "\n You can reproduce with: " + self.command + + return string + + def add_arguments(self, *args): + for arg in args: + self.command += " " + arg + + def build_arguments(self): + pass + + def set_result(self, result, message=None, error=None): + print "SETTING TER" + self.result = result + self.message = message + self.error = error + + def check_results(self): + if self.process.returncode == 0: + self.result = Result.PASSED + + self.result = Result.FAILED + + def wait_process(self): + last_change_ts = time.time() + while True: + self.process.poll() + if self.process.returncode is not None: + break + + if time.time() - last_change_ts > self.timeout: + self.result = Result.TIMEOUT + + # Dirty way to avoid eating to much CPU... + # good enough for us anyway. + time.sleep(1) + + self.check_results() + + def get_validate_criticals_errors(self): + self.reporter.out.seek(0) + ret = "[" + for l in self.reporter.out.readlines(): + if "critical : " in l: + if ret != "[": + ret += ", " + ret += l.split("critical : ")[1].replace("\n", '') + + if ret == "[": + return "No critical" + else: + return ret + "]" + + def run(self): + self.command = "%s " % (self.application) + self._starting_time = time.time() + self.build_arguments() + print "Launching %s" % self.command + try: + self.process = subprocess.Popen(self.command, + stderr=self.reporter.out, + stdout=self.reporter.out, + shell=True) + self.wait_process() + except KeyboardInterrupt: + self.process.kill() + raise + + try: + self.process.terminate() + except OSError: + pass + + self.time_taken = time.time() - self._starting_time + + +class TestsManager(object): + + """ A class responsible for managing tests. """ + + def __init__(self): + self.tests = [] + self.options = None + self.args = None + self.reporter = None + self.wanted_tests_patterns = [] + + def list_tests(self): + pass + + def get_tests(self): + return self.tests + + def add_options(self, parser): + """ Add more arguments. """ + pass + + def set_settings(self, options, args, reporter): + """ Set properties after options parsing. """ + self.options = options + self.args = args + self.reporter = reporter + + if options.wanted_tests: + for pattern in options.wanted_tests.split(','): + self.wanted_tests_patterns.append(re.compile(pattern)) + + + def _is_test_wanted(self, test): + for pattern in self.wanted_tests_patterns: + if pattern.findall(test.classname): + return True + + return False + + def run_tests(self): + for test in self.tests: + if self._is_test_wanted(test): + self.reporter.before_test(test) + test.run() + self.reporter.after_test() + + +class _TestsLauncher(object): + def __init__(self): + self.testers = [] + self.tests = [] + self.reporter = None + self._list_testers() + self.wanted_tests_patterns = [] + + def _list_testers(self): + def get_subclasses(c, env): + subclasses = [] + for symb in env.iteritems(): + try: + if issubclass(symb[1], c): + subclasses.append(symb[1]) + except TypeError: + pass + + return subclasses + + env = globals().copy() + d = os.path.dirname(__file__) + for f in os.listdir(os.path.join(d, "apps")): + execfile(os.path.join(d, "apps", f), env) + self.testers = [i() for i in get_subclasses(TestsManager, env)] + print self.testers + + def add_options(self, parser): + for tester in self.testers: + tester.add_options(parser) + + def set_settings(self, options, args): + self.reporter = reporters.XunitReporter(options) + mkdir(options.logsdir) + for tester in self.testers: + tester.set_settings(options, args, self.reporter) + + def list_tests(self): + for tester in self.testers: + tester.list_tests() + self.tests.extend(tester.tests) + + def run_tests(self): + for tester in self.testers: + tester.run_tests() + + def final_report(self): + self.reporter.final_report() diff --git a/validate/tools/utils.py b/validate/tools/utils.py new file mode 100644 index 0000000000..a7eee76fd6 --- /dev/null +++ b/validate/tools/utils.py @@ -0,0 +1,63 @@ +#!/usr/bin/python +# +# Copyright (c) 2013,Thibault Saunier +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this program; if not, write to the +# Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, +# Boston, MA 02110-1301, USA. +""" Some utilies. """ + +import os + + +class Result(object): + NOT_RUN = "Not run" + FAILED = "Failed" + TIMEOUT = "Timeout" + PASSED = "Passed" + + +class Colors(object): + HEADER = '\033[95m' + OKBLUE = '\033[94m' + OKGREEN = '\033[92m' + WARNING = '\033[93m' + FAIL = '\033[91m' + ENDC = '\033[0m' + + +def desactivate_colors(self): + Colors.HEADER = '' + Colors.OKBLUE = '' + Colors.OKGREEN = '' + Colors.WARNING = '' + Colors.FAIL = '' + Colors.ENDC = '' + + +def mkdir(directory): + try: + os.makedirs(directory) + except os.error: + pass + + +def launch_command(command, name="", color=None): + if name != "": + if color is not None: + print "%s%s" % (color, len(name) * "=") + print name + if color is not None: + print "%s%s" % (len(name) * "=", Colors.ENDC) + os.system(command)