validate:launcher: Port to Python3

And sync logging.py with Pitivi version
This commit is contained in:
Thibault Saunier 2016-11-04 18:04:37 -03:00
parent cf1404814a
commit 1e51aeb942
14 changed files with 483 additions and 670 deletions

View file

@ -1,4 +1,4 @@
#!/usr/bin/env python2
#!/usr/bin/env python3
import os
import subprocess
import sys
@ -68,13 +68,13 @@ def main():
break
if output_message:
print output_message
print(output_message)
if non_compliant_files:
print NOT_PEP8_COMPLIANT_MESSAGE_POST
print(NOT_PEP8_COMPLIANT_MESSAGE_POST)
for non_compliant_file in non_compliant_files:
print "autopep8 -i ", non_compliant_file, "; git add ", \
non_compliant_file
print "git commit"
print("autopep8 -i ", non_compliant_file, "; git add ",
non_compliant_file)
print("git commit")
sys.exit(1)

View file

@ -1,4 +1,4 @@
#!/usr/bin/env python2
#!/usr/bin/env python3
# Portions Copyright (C) 2009,2010 Xyne
# Portions Copyright (C) 2011 Sean Goller
@ -34,24 +34,21 @@ __all__ = ["RangeHTTPRequestHandler"]
import os
import sys
import posixpath
import BaseHTTPServer
from SocketServer import ThreadingMixIn
import urllib
import cgi
import http.server
import urllib.parse
import html
import shutil
import mimetypes
import io
import time
try:
from cStringIO import StringIO
except ImportError:
from StringIO import StringIO
_bandwidth = 0
class RangeHTTPRequestHandler(BaseHTTPServer.BaseHTTPRequestHandler):
class RangeHTTPRequestHandler(http.server.BaseHTTPRequestHandler):
"""Simple HTTP request handler with GET and HEAD commands.
@ -69,7 +66,7 @@ class RangeHTTPRequestHandler(BaseHTTPServer.BaseHTTPRequestHandler):
def do_GET(self):
"""Serve a GET request."""
f, start_range, end_range = self.send_head()
print "Got values of ", start_range, " and ", end_range, "...\n"
print ("Got values of {} and {}".format(start_range, end_range))
if f:
f.seek(start_range, 0)
chunk = 0x1000
@ -110,13 +107,13 @@ class RangeHTTPRequestHandler(BaseHTTPServer.BaseHTTPRequestHandler):
path = self.translate_path(self.path)
f = None
if os.path.isdir(path):
if not self.path.endswith('/'):
# redirect browser - doing basically what apache does
if not self.path.endswith("/"):
# redirect browser
self.send_response(301)
self.send_header("Location", self.path + "/")
self.end_headers()
return (None, 0, 0)
for index in "index.html", "index.htm":
for index in "index.html", "index.html":
index = os.path.join(path, index)
if os.path.exists(index):
path = index
@ -124,41 +121,45 @@ class RangeHTTPRequestHandler(BaseHTTPServer.BaseHTTPRequestHandler):
else:
return self.list_directory(path)
ctype = self.guess_type(path)
try:
# Always read in binary mode. Opening files in text mode may cause
# newline translations, making the actual size of the content
# transmitted *less* than the content-length!
f = open(path, 'rb')
f = open(path, "rb")
except IOError:
self.send_error(404, "File not found")
return (None, 0, 0)
if "Range" in self.headers:
self.send_response(206)
self.send_response(206) #partial content response
else :
self.send_response(200)
self.send_header("Content-type", ctype)
fs = os.fstat(f.fileno())
size = int(fs[6])
file_size = os.path.getsize(path)
start_range = 0
end_range = size
end_range = file_size
self.send_header("Accept-Ranges", "bytes")
if "Range" in self.headers:
s, e = self.headers['range'][6:].split('-', 1)
s, e = self.headers['range'][6:].split('-', 1) #bytes:%d-%d
sl = len(s)
el = len(e)
if sl > 0:
if sl:
start_range = int(s)
if el > 0:
if el:
end_range = int(e) + 1
elif el > 0:
ei = int(e)
if ei < size:
start_range = size - ei
self.send_header("Content-Range", 'bytes ' + str(
start_range) + '-' + str(end_range - 1) + '/' + str(size))
elif el:
start_range = file_size - min(file_size, int(e))
self.send_header("Content-Range", "bytes {}-{}/{}".format(start_range, end_range, file_size))
self.send_header("Content-Length", end_range - start_range)
self.send_header("Last-Modified", self.date_time_string(fs.st_mtime))
self.end_headers()
print ("Sending bytes {} to {}...".format(start_range, end_range))
return (f, start_range, end_range)
def list_directory(self, path):
@ -170,37 +171,47 @@ class RangeHTTPRequestHandler(BaseHTTPServer.BaseHTTPRequestHandler):
"""
try:
list = os.listdir(path)
except os.error:
self.send_error(404, "No permission to list directory")
lst = os.listdir(path)
except OSError:
self.send_error(404, "Access Forbidden")
return None
list.sort(key=lambda a: a.lower())
f = StringIO()
displaypath = cgi.escape(urllib.unquote(self.path))
f.write('<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 3.2 Final//EN">')
f.write("<html>\n<title>Directory listing for %s</title>\n" %
displaypath)
f.write("<body>\n<h2>Directory listing for %s</h2>\n" % displaypath)
f.write("<hr>\n<ul>\n")
for name in list:
lst.sort(key=lambda file_name : file_name.lower())
html_text = []
displaypath = html.escape(urllib.parse.unquote(self.path))
html_text.append('<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 3.2 Final//EN">')
html_text.append("<html>\n<title>Directory listing for {}</title>\n".format(displaypath))
html_text.append("<body>\n<h2>Directory listing for {}</h2>\n".format(displaypath))
html_text.append("<hr>\n<ul>\n")
for name in lst:
fullname = os.path.join(path, name)
displayname = linkname = name
# Append / for directories or @ for symbolic links
if os.path.isdir(fullname):
displayname = name + "/"
linkname = name + "/"
if os.path.islink(fullname):
displayname = name + "@"
# Note: a link to a directory displays with @ and links with /
f.write('<li><a href="%s">%s</a>\n'
% (urllib.quote(linkname), cgi.escape(displayname)))
f.write("</ul>\n<hr>\n</body>\n</html>\n")
length = f.tell()
html_text.append('<li><a href = "{}">{}</a>\n'.format(urllib.parse.quote(linkname), html.escape(displayname)))
html_text.append('</ul>\n</hr>\n</body>\n</html>\n')
byte_encoded_string = "\n".join(html_text).encode("utf-8", "surrogateescape")
f = io.BytesIO()
f.write(byte_encoded_string)
length = len(byte_encoded_string)
f.seek(0)
self.send_response(200)
self.send_header("Content-type", "text/html")
self.send_header("Content-Length", str(length))
self.send_header("Content-length", str(length))
self.end_headers()
return (f, 0, length)
def translate_path(self, path):
@ -212,35 +223,20 @@ class RangeHTTPRequestHandler(BaseHTTPServer.BaseHTTPRequestHandler):
"""
#abandon query parameters
path = path.split('?', 1)[0]
path = path.split('#', 1)[0]
path = posixpath.normpath(urllib.unquote(path))
words = path.split('/')
path = path.split("?", 1)[0]
path = path.split("#", 1)[0]
path = posixpath.normpath(urllib.parse.unquote(path))
words = path.split("/")
words = filter(None, words)
path = os.getcwd()
for word in words:
drive, word = os.path.splitdrive(word)
head, word = os.path.split(word)
if word in (os.curdir, os.pardir):
continue
if word in (os.curdir, os.pardir): continue
path = os.path.join(path, word)
return path
def copyfile(self, source, outputfile):
"""Copy all data between two file objects.
The SOURCE argument is a file object open for reading
(or anything with a read() method) and the DESTINATION
argument is a file object open for writing (or
anything with a write() method).
The only reason for overriding this would be to change
the block size or perhaps to replace newlines by CRLF
-- note however that this the default server uses this
to copy binary data as well.
"""
shutil.copyfileobj(source, outputfile)
def guess_type(self, path):
"""Guess the type of a file.
@ -258,37 +254,31 @@ class RangeHTTPRequestHandler(BaseHTTPServer.BaseHTTPRequestHandler):
"""
base, ext = posixpath.splitext(path)
if ext in self.extensions_map:
return self.extensions_map[ext]
if ext in self.extension_map:
return self.extension_map[ext]
ext = ext.lower()
if ext in self.extensions_map:
return self.extensions_map[ext]
if ext in self.extension_map:
return self.extension_map[ext]
else:
return self.extensions_map['']
return self.extension_map['']
if not mimetypes.inited:
mimetypes.init() # try to read system mime.types
extensions_map = mimetypes.types_map.copy()
extensions_map.update({
mimetypes.init()
extension_map = mimetypes.types_map.copy()
extension_map.update({
'': 'application/octet-stream', # Default
'.py': 'text/plain',
'.c': 'text/plain',
'.h': 'text/plain',
'.mp4': 'video/mp4',
'.ogg': 'video/ogg',
'.java' : 'text/plain',
})
class ThreadedHTTPServer(ThreadingMixIn, BaseHTTPServer.HTTPServer):
"""Handle requests in a separate thread."""
def test(handler_class = RangeHTTPRequestHandler,server_class = http.server.HTTPServer):
http.server.test(handler_class, server_class)
def test(HandlerClass=RangeHTTPRequestHandler,
ServerClass=ThreadedHTTPServer):
BaseHTTPServer.test(HandlerClass, ServerClass)
if __name__ == '__main__':
if len(sys.argv) > 2:
_bandwidth = int(sys.argv[2])
test()
if __name__ == "__main__":
httpd = http.server.HTTPServer(("0.0.0.0", int(sys.argv[1])), RangeHTTPRequestHandler)
httpd.serve_forever()

View file

@ -1,4 +1,4 @@
#!/usr/bin/env python2
#!/usr/bin/env python3
#
# Copyright (c) 2014,Thibault Saunier <thibault.saunier@collabora.com>
#

View file

@ -1,4 +1,4 @@
#!/usr/bin/env python2
#!/usr/bin/env python3
#
# Copyright (c) 2013,Thibault Saunier <thibault.saunier@collabora.com>
#
@ -19,9 +19,9 @@
import argparse
import os
import time
import urlparse
import urllib.parse
import subprocess
import ConfigParser
import configparser
from launcher.loggable import Loggable
from launcher.baseclasses import GstValidateTest, Test, \
@ -338,7 +338,7 @@ class GstValidateMixerTestsGenerator(GstValidatePipelineTestsGenerator):
self.mixed_srcs[name] = tuple(srcs)
for name, srcs in self.mixed_srcs.iteritems():
for name, srcs in self.mixed_srcs.items():
if isinstance(srcs, dict):
pipe_arguments = {
"mixer": self.mixer + " %s" % srcs["mixer_props"]}
@ -454,7 +454,7 @@ class GstValidateTranscodingTest(GstValidateTest, GstValidateEncodingTestInterfa
extra_env_variables = extra_env_variables or {}
file_dur = long(media_descriptor.get_duration()) / GST_SECOND
file_dur = int(media_descriptor.get_duration()) / GST_SECOND
if not media_descriptor.get_num_tracks("video"):
self.debug("%s audio only file applying transcoding ratio."
"File 'duration' : %s" % (classname, file_dur))
@ -490,8 +490,8 @@ class GstValidateTranscodingTest(GstValidateTest, GstValidateEncodingTestInterfa
self.dest_file = os.path.join(self.options.dest,
self.classname.replace(".transcode.", os.sep).
replace(".", os.sep))
mkdir(os.path.dirname(urlparse.urlsplit(self.dest_file).path))
if urlparse.urlparse(self.dest_file).scheme == "":
mkdir(os.path.dirname(urllib.parse.urlsplit(self.dest_file).path))
if urllib.parse.urlparse(self.dest_file).scheme == "":
self.dest_file = path2url(self.dest_file)
profile = self.get_profile()
@ -647,7 +647,7 @@ not been tested and explicitely activated if you set use --wanted-tests ALL""")
uri.startswith("http://127.0.0.1:8079/"):
uri = uri.replace("http://127.0.0.1:8079/",
"http://127.0.0.1:%r/" % self.options.http_server_port, 1)
media_descriptor.set_protocol(urlparse.urlparse(uri).scheme)
media_descriptor.set_protocol(urllib.parse.urlparse(uri).scheme)
for caps2, prot in GST_VALIDATE_CAPS_TO_PROTOCOL:
if caps2 == caps:
media_descriptor.set_protocol(prot)
@ -660,7 +660,7 @@ not been tested and explicitely activated if you set use --wanted-tests ALL""")
NamedDic({"path": media_info,
"media_descriptor": media_descriptor}),
special_scenarios))
except ConfigParser.NoOptionError as e:
except configparser.NoOptionError as e:
self.debug("Exception: %s for %s", e, media_info)
def _discover_file(self, uri, fpath):

View file

@ -1,4 +1,4 @@
#!/usr/bin/env python2
#!/usr/bin/env python3
#
# Copyright (c) 2013,Thibault Saunier <thibault.saunier@collabora.com>
#
@ -24,22 +24,22 @@ import os
import sys
import re
import copy
import SocketServer
import socketserver
import struct
import time
import utils
from . import utils
import signal
import urlparse
import urllib.parse
import subprocess
import threading
import Queue
import reporters
import ConfigParser
import loggable
from loggable import Loggable
import queue
from . import reporters
import configparser
from . import loggable
from .loggable import Loggable
import xml.etree.cElementTree as ET
from utils import mkdir, Result, Colors, printc, DEFAULT_TIMEOUT, GST_SECOND, \
from .utils import mkdir, Result, Colors, printc, DEFAULT_TIMEOUT, GST_SECOND, \
Protocols, look_for_file_in_source_dir, get_data_file
# The factor by which we increase the hard timeout when running inside
@ -208,8 +208,8 @@ class Test(Loggable):
self.process.communicate()
else:
pname = subprocess.check_output(("readlink -e /proc/%s/exe"
% self.process.pid).split(' ')).replace('\n', '')
raw_input("%sTimeout happened you can attach gdb doing: $gdb %s %d%s\n"
% self.process.pid).decode().split(' ')).replace('\n', '')
input("%sTimeout happened you can attach gdb doing: $gdb %s %d%s\n"
"Press enter to continue" % (Colors.FAIL, pname, self.process.pid,
Colors.ENDC))
@ -354,7 +354,7 @@ class Test(Loggable):
for supp in self.get_valgrind_suppressions():
vg_args.append(('suppressions', supp))
self.command = "valgrind %s %s" % (' '.join(map(lambda x: '--%s=%s' % (x[0], x[1]), vg_args)),
self.command = "valgrind %s %s" % (' '.join(['--%s=%s' % (x[0], x[1]) for x in vg_args]),
self.command)
# Tune GLib's memory allocator to be more valgrind friendly
@ -387,7 +387,7 @@ class Test(Loggable):
self.build_arguments()
self.proc_env = self.get_subproc_env()
for var, value in self.extra_env_variables.items():
for var, value in list(self.extra_env_variables.items()):
value = self.proc_env.get(var, '') + os.pathsep + value
self.proc_env[var] = value.strip(os.pathsep)
self.add_env_variable(var, self.proc_env[var])
@ -428,7 +428,7 @@ class Test(Loggable):
printc(message, Colors.FAIL)
with open(logfile, 'r') as fin:
print fin.read()
print(fin.read())
def _dump_log_files(self):
printc("Dumping log files on failure\n", Colors.FAIL)
@ -454,15 +454,15 @@ class Test(Loggable):
return self.result
class GstValidateListener(SocketServer.BaseRequestHandler):
class GstValidateListener(socketserver.BaseRequestHandler):
def handle(self):
"""Implements BaseRequestHandler handle method"""
while True:
raw_len = self.request.recv(4)
if raw_len == '':
if raw_len == b'':
return
msglen = struct.unpack('>I', raw_len)[0]
msg = self.request.recv(msglen)
msg = self.request.recv(msglen).decode()
if msg == '':
return
@ -575,7 +575,7 @@ class GstValidateTest(Test):
self.actions_infos.append(action_infos)
def server_wrapper(self, ready):
self.server = SocketServer.TCPServer(('localhost', 0), GstValidateListener)
self.server = socketserver.TCPServer(('localhost', 0), GstValidateListener)
self.server.socket.settimeout(0.0)
self.server.test = self
self.serverport = self.server.socket.getsockname()[1]
@ -709,7 +709,7 @@ class GstValidateTest(Test):
for key in ['bug', 'sometimes']:
if key in expected_failure:
del expected_failure[key]
for key, value in report.items():
for key, value in list(report.items()):
if key in expected_failure:
if not re.findall(expected_failure[key], value):
return False
@ -826,7 +826,7 @@ class GstValidateEncodingTestInterface(object):
def get_current_size(self):
try:
size = os.stat(urlparse.urlparse(self.dest_file).path).st_size
size = os.stat(urllib.parse.urlparse(self.dest_file).path).st_size
except OSError:
return None
@ -963,7 +963,7 @@ class TestsManager(Loggable):
self.wanted_tests_patterns = []
self.blacklisted_tests_patterns = []
self._generators = []
self.queue = Queue.Queue()
self.queue = queue.Queue()
self.jobs = []
self.total_num_tests = 0
self.starting_test_num = 0
@ -979,7 +979,7 @@ class TestsManager(Loggable):
def add_expected_issues(self, expected_failures):
expected_failures_re = {}
for test_name_regex, failures in expected_failures.items():
for test_name_regex, failures in list(expected_failures.items()):
regex = re.compile(test_name_regex)
expected_failures_re[regex] = failures
for test in self.tests:
@ -989,7 +989,7 @@ class TestsManager(Loggable):
self.expected_failures.update(expected_failures_re)
def add_test(self, test):
for regex, failures in self.expected_failures.items():
for regex, failures in list(self.expected_failures.items()):
if regex.findall(test.classname):
test.expected_failures.extend(failures)
@ -1094,7 +1094,7 @@ class TestsManager(Loggable):
# Check process every second for timeout
try:
self.queue.get(timeout=1)
except Queue.Empty:
except queue.Empty:
pass
for test in self.jobs:
@ -1232,7 +1232,7 @@ class _TestsLauncher(Loggable):
files = []
for f in files:
if f.endswith(".py"):
execfile(os.path.join(app_dir, f), env)
exec(compile(open(os.path.join(app_dir, f)).read(), os.path.join(app_dir, f), 'exec'), env)
def _exec_apps(self, env):
app_dirs = self._list_app_dirs()
@ -1315,7 +1315,7 @@ class _TestsLauncher(Loggable):
globals()["options"] = options
c__file__ = __file__
globals()["__file__"] = self.options.config
execfile(self.options.config, globals())
exec(compile(open(self.options.config).read(), self.options.config, 'exec'), globals())
globals()["__file__"] = c__file__
def set_settings(self, options, args):
@ -1374,7 +1374,7 @@ class _TestsLauncher(Loggable):
and tester.check_testslist:
try:
testlist_file = open(os.path.splitext(testsuite.__file__)[0] + ".testslist",
'rw')
'r+')
know_tests = testlist_file.read().split("\n")
testlist_file.close()
@ -1410,7 +1410,7 @@ class _TestsLauncher(Loggable):
return -1
self.tests.extend(tests)
return sorted(list(self.tests))
return sorted(list(self.tests), key=lambda t: t.classname)
def _run_tests(self):
cur_test_num = 0
@ -1458,7 +1458,7 @@ class NamedDic(object):
def __init__(self, props):
if props:
for name, value in props.iteritems():
for name, value in props.items():
setattr(self, name, value)
@ -1562,7 +1562,7 @@ class ScenarioManager(Loggable):
except subprocess.CalledProcessError:
pass
config = ConfigParser.ConfigParser()
config = configparser.RawConfigParser()
f = open(scenario_defs)
config.readfp(f)
@ -1582,7 +1582,8 @@ class ScenarioManager(Loggable):
name = section
path = None
scenarios.append(Scenario(name, config.items(section), path))
props = config.items(section)
scenarios.append(Scenario(name, props, path))
if not scenario_paths:
self.discovered = True
@ -1744,7 +1745,7 @@ class GstValidateMediaDescriptor(MediaDescriptor):
self.media_xml.attrib["duration"]
self.media_xml.attrib["seekable"]
self.set_protocol(urlparse.urlparse(urlparse.urlparse(self.get_uri()).scheme).scheme)
self.set_protocol(urllib.parse.urlparse(urllib.parse.urlparse(self.get_uri()).scheme).scheme)
@staticmethod
def new_from_uri(uri, verbose=False, full=False):
@ -1808,7 +1809,7 @@ class GstValidateMediaDescriptor(MediaDescriptor):
return self.media_xml.attrib["uri"]
def get_duration(self):
return long(self.media_xml.attrib["duration"])
return int(self.media_xml.attrib["duration"])
def set_protocol(self, protocol):
self.media_xml.attrib["protocol"] = protocol

View file

@ -1,4 +1,4 @@
#!/usr/bin/env python2
#!/usr/bin/env python3
#
# Copyright (c) 2015,Thibault Saunier <thibault.saunier@collabora.com>
#

View file

@ -1,4 +1,4 @@
#!/usr/bin/env python2
#!/usr/bin/env python3
#
# Copyright (c) 2013,Thibault Saunier <thibault.saunier@collabora.com>
#
@ -19,10 +19,10 @@
import os
import time
import loggable
from . import loggable
import subprocess
import sys
import urllib2
import urllib.request, urllib.error, urllib.parse
logcat = "httpserver"
@ -42,10 +42,10 @@ class HTTPServer(loggable.Loggable):
start = time.time()
while True:
try:
response = urllib2.urlopen('http://127.0.0.1:%s' % (
response = urllib.request.urlopen('http://127.0.0.1:%s' % (
self.options.http_server_port))
return True
except urllib2.URLError as e:
except urllib.error.URLError as e:
pass
if time.time() - start > timeout:
@ -61,7 +61,7 @@ class HTTPServer(loggable.Loggable):
if self._check_is_up(timeout=2):
return True
print "Starting Server"
print("Starting Server")
try:
self.debug("Launching http server")
cmd = "%s %s %d %s" % (sys.executable, os.path.join(os.path.dirname(__file__),
@ -85,14 +85,14 @@ class HTTPServer(loggable.Loggable):
time.sleep(1)
if self._check_is_up():
print "Started"
print("Started")
return True
else:
print "Failed starting server"
print("Failed starting server")
self._process.terminate()
self._process = None
except OSError as ex:
print "Failed starting server"
print("Failed starting server")
self.warning(logcat, "Could not launch server %s" % ex)
return False

File diff suppressed because it is too large Load diff

View file

@ -1,4 +1,4 @@
#!/usr/bin/env python2
#!/usr/bin/env python3
#
# Copyright (c) 2014,Thibault Saunier <thibault.saunier@collabora.com>
#
@ -18,20 +18,20 @@
# Boston, MA 02110-1301, USA.
import os
import sys
import utils
import urlparse
import loggable
from . import utils
import urllib.parse
from . import loggable
import argparse
import tempfile
import reporters
from . import reporters
import subprocess
from loggable import Loggable
from httpserver import HTTPServer
from vfb_server import get_virual_frame_buffer_server
from baseclasses import _TestsLauncher, ScenarioManager
from utils import printc, path2url, DEFAULT_MAIN_DIR, launch_command, Colors, Protocols, which
from .loggable import Loggable
from .httpserver import HTTPServer
from .vfb_server import get_virual_frame_buffer_server
from .baseclasses import _TestsLauncher, ScenarioManager
from .utils import printc, path2url, DEFAULT_MAIN_DIR, launch_command, Colors, Protocols, which
LESS = "less"
@ -264,7 +264,7 @@ class LauncherConfig(Loggable):
% self.redirect_logs, Colors.FAIL, True)
return False
if urlparse.urlparse(self.dest).scheme == "":
if urllib.parse.urlparse(self.dest).scheme == "":
self.dest = path2url(self.dest)
if self.no_color:
@ -506,7 +506,7 @@ Note that all testsuite should be inside python modules, so the directory should
tests_launcher.add_options(parser)
if _help_message == HELP and which(LESS):
tmpf = tempfile.NamedTemporaryFile()
tmpf = tempfile.NamedTemporaryFile(mode='r+')
parser.print_help(file=tmpf)
exit(os.system("%s %s" % (LESS, tmpf.name)))
@ -560,17 +560,18 @@ Note that all testsuite should be inside python modules, so the directory should
# Also happened here: https://cgit.freedesktop.org/gstreamer/gst-plugins-good/commit/tests/check/Makefile.am?id=8e2c1d1de56bddbff22170f8b17473882e0e63f9
os.environ['GSETTINGS_BACKEND'] = "memory"
e = None
exception = None
try:
tests_launcher.run_tests()
except Exception as e:
exception = e
pass
finally:
tests_launcher.final_report()
tests_launcher.clean_tests()
httpsrv.stop()
vfb_server.stop()
if e is not None:
raise
if exception is not None:
raise exception
return 0

View file

@ -1,4 +1,4 @@
#!/usr/bin/env python2
#!/usr/bin/env python3
#
# Copyright (c) 2013,Thibault Saunier <thibault.saunier@collabora.com>
#
@ -25,11 +25,11 @@ import time
import codecs
import datetime
import tempfile
from loggable import Loggable
from .loggable import Loggable
from xml.sax import saxutils
from utils import Result, printc, Colors
from .utils import Result, printc, Colors
UNICODE_STRINGS = (type(unicode()) == type(str())) # noqa
UNICODE_STRINGS = (type(str()) == type(str())) # noqa
class UnknownResult(Exception):
@ -91,14 +91,14 @@ class Reporter(Loggable):
self.add_results(test)
def final_report(self):
print "\n"
print("\n")
printc("Final Report:", title=True)
for test in sorted(self.results, key=lambda test: test.result):
printc(test)
if test.result != Result.PASSED:
print "\n"
print("\n")
print "\n"
print("\n")
lenstat = (len("Statistics") + 1)
printc("Statistics:\n%s" % (lenstat * "-"), Colors.OKBLUE)
printc("\n%sTotal time spent: %s seconds\n" %
@ -157,7 +157,7 @@ class XunitReporter(Reporter):
def _quoteattr(self, attr):
"""Escape an XML attribute. Value can be unicode."""
attr = xml_safe(attr)
if isinstance(attr, unicode) and not UNICODE_STRINGS:
if isinstance(attr, str) and not UNICODE_STRINGS:
attr = attr.encode(self.encoding)
return saxutils.quoteattr(attr)
@ -175,10 +175,10 @@ class XunitReporter(Reporter):
self.stats['total'] = (self.stats['timeout'] + self.stats['failures'] +
self.stats['passed'] + self.stats['skipped'])
xml_file.write(u'<?xml version="1.0" encoding="%(encoding)s"?>'
u'<testsuite name="gst-validate-launcher" tests="%(total)d" '
u'errors="%(timeout)d" failures="%(failures)d" '
u'skip="%(skipped)d">' % self.stats)
xml_file.write('<?xml version="1.0" encoding="%(encoding)s"?>'
'<testsuite name="gst-validate-launcher" tests="%(total)d" '
'errors="%(timeout)d" failures="%(failures)d" '
'skip="%(skipped)d">' % self.stats)
tmp_xml_file = codecs.open(self.tmp_xml_file.name, 'r',
self.encoding, 'replace')
@ -186,7 +186,7 @@ class XunitReporter(Reporter):
for l in tmp_xml_file:
xml_file.write(l)
xml_file.write(u'</testsuite>')
xml_file.write('</testsuite>')
xml_file.close()
tmp_xml_file.close()
os.remove(self.tmp_xml_file.name)

View file

@ -1,4 +1,4 @@
#!/usr/bin/env python2
#!/usr/bin/env python3
#
# Copyright (c) 2013,Thibault Saunier <thibault.saunier@collabora.com>
#
@ -22,14 +22,14 @@ import config
import os
import re
import sys
import urllib
import urlparse
import urllib.request, urllib.parse, urllib.error
import urllib.parse
import subprocess
from operator import itemgetter
GST_SECOND = long(1000000000)
GST_SECOND = int(1000000000)
DEFAULT_TIMEOUT = 30
DEFAULT_MAIN_DIR = os.path.join(os.path.expanduser("~"), "gst-validate")
DEFAULT_GST_QA_ASSETS = os.path.join(DEFAULT_MAIN_DIR, "gst-integration-testsuites")
@ -86,7 +86,7 @@ def mkdir(directory):
def which(name, extra_path=None):
exts = filter(None, os.environ.get('PATHEXT', '').split(os.pathsep))
exts = [_f for _f in os.environ.get('PATHEXT', '').split(os.pathsep) if _f]
path = os.environ.get('PATH', '')
if extra_path:
path = extra_path + os.pathsep + path
@ -142,20 +142,20 @@ def launch_command(command, color=None, fails=False):
def path2url(path):
return urlparse.urljoin('file:', urllib.pathname2url(path))
return urllib.parse.urljoin('file:', urllib.request.pathname2url(path))
def url2path(url):
path = urlparse.urlparse(url).path
path = urllib.parse.urlparse(url).path
if "win32" in sys.platform:
if path[0] == '/':
return path[1:] # We need to remove the first '/' on windows
path = urllib.unquote(path)
path = urllib.parse.unquote(path)
return path
def isuri(string):
url = urlparse.urlparse(string)
url = urllib.parse.urlparse(string)
if url.scheme != "" and url.scheme != "":
return True
@ -169,7 +169,7 @@ def touch(fname, times=None):
def get_subclasses(klass, env):
subclasses = []
for symb in env.iteritems():
for symb in env.items():
try:
if issubclass(symb[1], klass) and not symb[1] is klass:
subclasses.append(symb[1])
@ -216,15 +216,15 @@ def get_data_file(subdir, name):
def gsttime_from_tuple(stime):
return long((int(stime[0]) * 3600 + int(stime[1]) * 60 + int(stime[2])) * GST_SECOND + int(stime[3]))
return int((int(stime[0]) * 3600 + int(stime[1]) * 60 + int(stime[2])) * GST_SECOND + int(stime[3]))
timeregex = re.compile(r'(?P<_0>.+):(?P<_1>.+):(?P<_2>.+)\.(?P<_3>.+)')
def parse_gsttimeargs(time):
stime = map(itemgetter(1), sorted(
timeregex.match(time).groupdict().items()))
return long((int(stime[0]) * 3600 + int(stime[1]) * 60 + int(stime[2])) * GST_SECOND + int(stime[3]))
stime = list(map(itemgetter(1), sorted(
timeregex.match(time).groupdict().items())))
return int((int(stime[0]) * 3600 + int(stime[1]) * 60 + int(stime[2])) * GST_SECOND + int(stime[3]))
def get_duration(media_file):
@ -232,7 +232,7 @@ def get_duration(media_file):
duration = 0
res = ''
try:
res = subprocess.check_output([DISCOVERER_COMMAND, media_file])
res = subprocess.check_output([DISCOVERER_COMMAND, media_file]).decode()
except subprocess.CalledProcessError:
# gst-media-check returns !0 if seeking is not possible, we do not care
# in that case.

View file

@ -1,4 +1,4 @@
#!/usr/bin/env python2
#!/usr/bin/env python3
#
# Copyright (c) 2015,Thibault Saunier <tsaunier@gnome.org>
#
@ -19,7 +19,7 @@
import os
import time
import loggable
from . import loggable
import subprocess
@ -55,7 +55,7 @@ class Xvfb(VirtualFrameBufferServer):
os.environ["DISPLAY"] = self.display_id
subprocess.check_output(["xset", "q"],
stderr=self._logsfile)
print("DISPLAY set to %s" % self.display_id)
print(("DISPLAY set to %s" % self.display_id))
return True
except subprocess.CalledProcessError:
pass

View file

@ -1,4 +1,4 @@
#!/usr/bin/env python2
#!/usr/bin/env python3
#
# Copyright (c) 2015, Edward Hervey <edward@centricular.com>
#

View file

@ -1,4 +1,4 @@
#!/usr/bin/env python2
#!/usr/bin/env python3
#
# Copyright (c) 2014,Thibault Saunier <thibault.saunier@collabora.com>
#
@ -31,7 +31,7 @@ def _get_git_first_hash(path):
cdir = os.path.abspath(os.curdir)
try:
os.chdir(path)
res = subprocess.check_output(['git', 'rev-list', '--max-parents=0', 'HEAD']).rstrip('\n')
res = subprocess.check_output(['git', 'rev-list', '--max-parents=0', 'HEAD']).decode().rstrip('\n')
except (subprocess.CalledProcessError, OSError):
res = ''
finally:
@ -48,7 +48,7 @@ def _in_devel():
def _add_gst_launcher_path():
if _in_devel():
print "Running with development path"
print("Running with development path")
dir_ = os.path.dirname(os.path.abspath(__file__))
root = os.path.split(dir_)[0]
elif __file__.startswith(BUILDDIR):