validate-launcher: pep8ify sources.

https://bugzilla.gnome.org/show_bug.cgi?id=739208
This commit is contained in:
Mathieu Duponchelle 2014-10-24 14:23:52 +02:00
parent fa39e0358a
commit 8c1e84b5f4
9 changed files with 359 additions and 253 deletions

View file

@ -1,7 +1,7 @@
#!/usr/bin/env python2
#Portions Copyright (C) 2009,2010 Xyne
#Portions Copyright (C) 2011 Sean Goller
# Portions Copyright (C) 2009,2010 Xyne
# Portions Copyright (C) 2011 Sean Goller
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
@ -15,7 +15,8 @@
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
# 02110-1301, USA.
"""Range HTTP Server.
@ -48,6 +49,7 @@ except ImportError:
_bandwidth = 0
class RangeHTTPRequestHandler(BaseHTTPServer.BaseHTTPRequestHandler):
"""Simple HTTP request handler with GET and HEAD commands.
@ -151,7 +153,8 @@ class RangeHTTPRequestHandler(BaseHTTPServer.BaseHTTPRequestHandler):
ei = int(e)
if ei < size:
start_range = size - ei
self.send_header("Content-Range", 'bytes ' + str(start_range) + '-' + str(end_range - 1) + '/' + str(size))
self.send_header("Content-Range", 'bytes ' + str(
start_range) + '-' + str(end_range - 1) + '/' + str(size))
self.send_header("Content-Length", end_range - start_range)
self.send_header("Last-Modified", self.date_time_string(fs.st_mtime))
self.end_headers()
@ -174,7 +177,8 @@ class RangeHTTPRequestHandler(BaseHTTPServer.BaseHTTPRequestHandler):
f = StringIO()
displaypath = cgi.escape(urllib.unquote(self.path))
f.write('<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 3.2 Final//EN">')
f.write("<html>\n<title>Directory listing for %s</title>\n" % displaypath)
f.write("<html>\n<title>Directory listing for %s</title>\n" %
displaypath)
f.write("<body>\n<h2>Directory listing for %s</h2>\n" % displaypath)
f.write("<hr>\n<ul>\n")
for name in list:
@ -207,8 +211,8 @@ class RangeHTTPRequestHandler(BaseHTTPServer.BaseHTTPRequestHandler):
"""
# abandon query parameters
path = path.split('?',1)[0]
path = path.split('#',1)[0]
path = path.split('?', 1)[0]
path = path.split('#', 1)[0]
path = posixpath.normpath(urllib.unquote(path))
words = path.split('/')
words = filter(None, words)
@ -216,7 +220,8 @@ class RangeHTTPRequestHandler(BaseHTTPServer.BaseHTTPRequestHandler):
for word in words:
drive, word = os.path.splitdrive(word)
head, word = os.path.split(word)
if word in (os.curdir, os.pardir): continue
if word in (os.curdir, os.pardir):
continue
path = os.path.join(path, word)
return path
@ -261,20 +266,20 @@ class RangeHTTPRequestHandler(BaseHTTPServer.BaseHTTPRequestHandler):
return self.extensions_map['']
if not mimetypes.inited:
mimetypes.init() # try to read system mime.types
mimetypes.init() # try to read system mime.types
extensions_map = mimetypes.types_map.copy()
extensions_map.update({
'': 'application/octet-stream', # Default
'': 'application/octet-stream', # Default
'.py': 'text/plain',
'.c': 'text/plain',
'.h': 'text/plain',
'.mp4': 'video/mp4',
'.ogg': 'video/ogg',
})
})
def test(HandlerClass = RangeHTTPRequestHandler,
ServerClass = BaseHTTPServer.HTTPServer):
def test(HandlerClass=RangeHTTPRequestHandler,
ServerClass=BaseHTTPServer.HTTPServer):
BaseHTTPServer.test(HandlerClass, ServerClass)

View file

@ -32,9 +32,9 @@ from baseclasses import GstValidateTest, TestsManager, Test, \
from utils import path2url, DEFAULT_TIMEOUT, which, \
GST_SECOND, Result, Protocols
######################################
# Private global variables #
######################################
#
# Private global variables #
#
# definitions of commands to use
GST_VALIDATE_COMMAND = "gst-validate-1.0"
@ -47,9 +47,9 @@ if "win32" in sys.platform:
AUDIO_ONLY_FILE_TRANSCODING_RATIO = 5
#################################################
# API to be used to create testsuites #
#################################################
#
# API to be used to create testsuites #
#
"""
Some info about protocols and how to handle them
@ -62,6 +62,7 @@ GST_VALIDATE_PROTOCOL_TIMEOUTS = {Protocols.HTTP: 120,
class GstValidateMediaCheckTestsGenerator(GstValidateTestsGenerator):
def __init__(self, test_manager):
GstValidateTestsGenerator.__init__(self, "media_check", test_manager)
@ -85,6 +86,7 @@ class GstValidateMediaCheckTestsGenerator(GstValidateTestsGenerator):
class GstValidateTranscodingTestsGenerator(GstValidateTestsGenerator):
def __init__(self, test_manager):
GstValidateTestsGenerator.__init__(self, "transcode", test_manager)
@ -95,7 +97,8 @@ class GstValidateTranscodingTestsGenerator(GstValidateTestsGenerator):
for comb in self.test_manager.get_encoding_formats():
classname = "validate.%s.transcode.to_%s.%s" % (mediainfo.media_descriptor.get_protocol(),
str(comb).replace(' ', '_'),
str(comb).replace(
' ', '_'),
mediainfo.media_descriptor.get_clean_name())
self.add_test(GstValidateTranscodingTest(classname,
self.test_manager.options,
@ -106,7 +109,9 @@ class GstValidateTranscodingTestsGenerator(GstValidateTestsGenerator):
class GstValidatePipelineTestsGenerator(GstValidateTestsGenerator):
def __init__(self, name, test_manager, pipeline_template=None, pipelines_descriptions=None,
def __init__(
self, name, test_manager, pipeline_template=None, pipelines_descriptions=None,
valid_scenarios=[]):
"""
@name: The name of the generator
@ -138,10 +143,10 @@ class GstValidatePipelineTestsGenerator(GstValidateTestsGenerator):
if self._valid_scenarios:
scenarios = [scenario for scenario in scenarios if
scenario.name in self._valid_scenarios]
scenario.name in self._valid_scenarios]
return super(GstValidatePipelineTestsGenerator, self).generate_tests(
uri_minfo_special_scenarios, scenarios)
uri_minfo_special_scenarios, scenarios)
def populate_tests(self, uri_minfo_special_scenarios, scenarios):
for name, pipeline in self._pipelines_descriptions:
@ -158,7 +163,8 @@ class GstValidatePipelineTestsGenerator(GstValidateTestsGenerator):
class GstValidatePlaybinTestsGenerator(GstValidatePipelineTestsGenerator):
def __init__(self, test_manager):
GstValidatePipelineTestsGenerator.__init__(self, "playback", test_manager, "playbin")
GstValidatePipelineTestsGenerator.__init__(
self, "playback", test_manager, "playbin")
def populate_tests(self, uri_minfo_special_scenarios, scenarios):
for uri, minfo, special_scenarios in uri_minfo_special_scenarios:
@ -177,7 +183,8 @@ class GstValidatePlaybinTestsGenerator(GstValidatePipelineTestsGenerator):
else:
fakesink = "'fakesink'"
cpipe += " audio-sink=%s video-sink=%s" %(fakesink, fakesink)
cpipe += " audio-sink=%s video-sink=%s" % (
fakesink, fakesink)
fname = "%s.%s" % (self.get_fname(scenario,
protocol),
@ -198,16 +205,20 @@ class GstValidatePlaybinTestsGenerator(GstValidatePipelineTestsGenerator):
class GstValidateMixerTestsGenerator(GstValidatePipelineTestsGenerator):
def __init__(self, name, test_manager, mixer, media_type, converter="", num_sources=3,
def __init__(
self, name, test_manager, mixer, media_type, converter="", num_sources=3,
mixed_srcs={}, valid_scenarios=[]):
pipe_template = "%(mixer)s name=_mixer ! " + converter + " ! %(sink)s "
pipe_template = "%(mixer)s name=_mixer ! " + \
converter + " ! %(sink)s "
self.converter = converter
self.mixer = mixer
self.media_type = media_type
self.num_sources = num_sources
self.mixed_srcs = mixed_srcs
super(GstValidateMixerTestsGenerator, self).__init__(name, test_manager, pipe_template,
valid_scenarios=valid_scenarios)
super(
GstValidateMixerTestsGenerator, self).__init__(name, test_manager, pipe_template,
valid_scenarios=valid_scenarios)
def populate_tests(self, uri_minfo_special_scenarios, scenarios):
wanted_ressources = []
@ -227,7 +238,8 @@ class GstValidateMixerTestsGenerator(GstValidatePipelineTestsGenerator):
name = ""
for nsource in range(self.num_sources):
uri, minfo = wanted_ressources[i + nsource]
srcs.append("uridecodebin uri=%s ! %s" % (uri, self.converter))
srcs.append(
"uridecodebin uri=%s ! %s" % (uri, self.converter))
fname = os.path.basename(uri).replace(".", "_")
if not name:
name = fname
@ -238,7 +250,8 @@ class GstValidateMixerTestsGenerator(GstValidatePipelineTestsGenerator):
for name, srcs in self.mixed_srcs.iteritems():
if isinstance(srcs, dict):
pipe_arguments = {"mixer": self.mixer + " %s" % srcs["mixer_props"]}
pipe_arguments = {
"mixer": self.mixer + " %s" % srcs["mixer_props"]}
srcs = srcs["sources"]
else:
pipe_arguments = {"mixer": self.mixer}
@ -268,10 +281,12 @@ class GstValidateMixerTestsGenerator(GstValidatePipelineTestsGenerator):
class GstValidateLaunchTest(GstValidateTest):
def __init__(self, classname, options, reporter, pipeline_desc,
timeout=DEFAULT_TIMEOUT, scenario=None, media_descriptor=None):
try:
timeout = GST_VALIDATE_PROTOCOL_TIMEOUTS[media_descriptor.get_protocol()]
timeout = GST_VALIDATE_PROTOCOL_TIMEOUTS[
media_descriptor.get_protocol()]
except KeyError:
pass
except AttributeError:
@ -282,11 +297,12 @@ class GstValidateLaunchTest(GstValidateTest):
duration = scenario.get_duration()
elif media_descriptor:
duration = media_descriptor.get_duration() / GST_SECOND
super(GstValidateLaunchTest, self).__init__(GST_VALIDATE_COMMAND, classname,
options, reporter,
duration=duration,
scenario=scenario,
timeout=timeout)
super(
GstValidateLaunchTest, self).__init__(GST_VALIDATE_COMMAND, classname,
options, reporter,
duration=duration,
scenario=scenario,
timeout=timeout)
self.pipeline_desc = pipeline_desc
self.media_descriptor = media_descriptor
@ -295,7 +311,8 @@ class GstValidateLaunchTest(GstValidateTest):
GstValidateTest.build_arguments(self)
self.add_arguments(self.pipeline_desc)
if self.media_descriptor is not None:
self.add_arguments("--set-media-info", self.media_descriptor.get_path())
self.add_arguments(
"--set-media-info", self.media_descriptor.get_path())
def get_current_value(self):
if self.scenario:
@ -310,7 +327,8 @@ class GstValidateLaunchTest(GstValidateTest):
https://bugzilla.gnome.org/show_bug.cgi?id=723868""")
return Result.KNOWN_ERROR
self.set_result(Result.FAILED, "Pipeline did not stop 30 Seconds after sending EOS")
self.set_result(
Result.FAILED, "Pipeline did not stop 30 Seconds after sending EOS")
return Result.FAILED
@ -318,11 +336,14 @@ class GstValidateLaunchTest(GstValidateTest):
class GstValidateMediaCheckTest(Test):
def __init__(self, classname, options, reporter, media_descriptor, uri, minfo_path,
def __init__(
self, classname, options, reporter, media_descriptor, uri, minfo_path,
timeout=DEFAULT_TIMEOUT):
super(GstValidateMediaCheckTest, self).__init__(G_V_DISCOVERER_COMMAND, classname,
options, reporter,
timeout=timeout)
super(
GstValidateMediaCheckTest, self).__init__(G_V_DISCOVERER_COMMAND, classname,
options, reporter,
timeout=timeout)
self._uri = uri
self.media_descriptor = media_descriptor
self._media_info_path = minfo_path
@ -334,6 +355,7 @@ class GstValidateMediaCheckTest(Test):
class GstValidateTranscodingTest(GstValidateTest, GstValidateEncodingTestInterface):
scenarios_manager = ScenarioManager()
def __init__(self, classname, options, reporter,
combination, uri, media_descriptor,
timeout=DEFAULT_TIMEOUT,
@ -344,25 +366,28 @@ class GstValidateTranscodingTest(GstValidateTest, GstValidateEncodingTestInterfa
file_dur = long(media_descriptor.get_duration()) / GST_SECOND
if not media_descriptor.get_num_tracks("video"):
self.debug("%s audio only file applying transcoding ratio."
"File 'duration' : %s" % (classname , file_dur))
"File 'duration' : %s" % (classname, file_dur))
duration = file_dur / AUDIO_ONLY_FILE_TRANSCODING_RATIO
else:
duration = file_dur
try:
timeout = GST_VALIDATE_PROTOCOL_TIMEOUTS[media_descriptor.get_protocol()]
timeout = GST_VALIDATE_PROTOCOL_TIMEOUTS[
media_descriptor.get_protocol()]
except KeyError:
pass
super(GstValidateTranscodingTest, self).__init__(GST_VALIDATE_TRANSCODING_COMMAND,
classname,
options,
reporter,
duration=duration,
timeout=timeout,
scenario=scenario)
super(
GstValidateTranscodingTest, self).__init__(GST_VALIDATE_TRANSCODING_COMMAND,
classname,
options,
reporter,
duration=duration,
timeout=timeout,
scenario=scenario)
GstValidateEncodingTestInterface.__init__(self, combination, media_descriptor)
GstValidateEncodingTestInterface.__init__(
self, combination, media_descriptor)
self.media_descriptor = media_descriptor
self.uri = uri
@ -396,7 +421,8 @@ class GstValidateTranscodingTest(GstValidateTest, GstValidateEncodingTestInterfa
https://bugzilla.gnome.org/show_bug.cgi?id=723868""")
return Result.KNOWN_ERROR
self.set_result(Result.FAILED, "Pipeline did not stop 30 Seconds after sending EOS")
self.set_result(
Result.FAILED, "Pipeline did not stop 30 Seconds after sending EOS")
return Result.FAILED
@ -451,8 +477,8 @@ class GstValidateTestManager(GstValidateBaseTestManager):
def add_options(self, parser):
group = parser.add_argument_group("GstValidate tools specific options"
" and behaviours",
description="""When using --wanted-tests, all the scenarios can be used, even those which have
" and behaviours",
description="""When using --wanted-tests, all the scenarios can be used, even those which have
not been tested and explicitely activated if you set use --wanted-tests ALL""")
def populate_testsuite(self):
@ -506,7 +532,8 @@ not been tested and explicitely activated if you set use --wanted-tests ALL""")
break
scenario_bname = media_descriptor.get_media_filepath()
special_scenarios = self.scenarios_manager.find_special_scenarios(scenario_bname)
special_scenarios = self.scenarios_manager.find_special_scenarios(
scenario_bname)
self._uris.append((uri,
NamedDic({"path": media_info,
"media_descriptor": media_descriptor}),
@ -516,7 +543,8 @@ not been tested and explicitely activated if you set use --wanted-tests ALL""")
def _discover_file(self, uri, fpath):
try:
media_info = "%s.%s" % (fpath, GstValidateMediaDescriptor.MEDIA_INFO_EXT)
media_info = "%s.%s" % (
fpath, GstValidateMediaDescriptor.MEDIA_INFO_EXT)
args = G_V_DISCOVERER_COMMAND.split(" ")
args.append(uri)
if os.path.isfile(media_info):
@ -528,7 +556,8 @@ not been tested and explicitely activated if you set use --wanted-tests ALL""")
elif not self.options.generate_info:
return True
media_descriptor = GstValidateMediaDescriptor.new_from_uri(uri, True,
media_descriptor = GstValidateMediaDescriptor.new_from_uri(
uri, True,
self.options.generate_info_full)
if media_descriptor:
self._add_media(media_descriptor, uri)
@ -574,7 +603,7 @@ not been tested and explicitely activated if you set use --wanted-tests ALL""")
uri = test.media_descriptor.get_uri()
if protocol in [Protocols.HTTP, Protocols.HLS, Protocols.DASH] and \
"127.0.0.1:%s" % (self.options.http_server_port) in uri:
"127.0.0.1:%s" % (self.options.http_server_port) in uri:
return True
return False
@ -583,13 +612,16 @@ not been tested and explicitely activated if you set use --wanted-tests ALL""")
for i in range(len(options.wanted_tests)):
if "ALL" in options.wanted_tests[i]:
self._run_defaults = False
options.wanted_tests[i] = options.wanted_tests[i].replace("ALL", "")
options.wanted_tests[
i] = options.wanted_tests[i].replace("ALL", "")
try:
options.wanted_tests.remove("")
except ValueError:
pass
super(GstValidateTestManager, self).set_settings(options, args, reporter)
super(GstValidateTestManager, self).set_settings(
options, args, reporter)
def gst_validate_checkout_element_present(element_name):
null = open(os.devnull)

View file

@ -20,12 +20,12 @@
# Boston, MA 02110-1301, USA.
valid_mixing_scenarios=["play_15s",
"fast_forward",
"seek_forward",
"seek_backward",
"seek_with_stop",
"scrub_forward_seeking"]
valid_mixing_scenarios = ["play_15s",
"fast_forward",
"seek_forward",
"seek_backward",
"seek_with_stop",
"scrub_forward_seeking"]
def register_compositing_tests(self):
@ -35,11 +35,12 @@ def register_compositing_tests(self):
"""
for compositor in ["compositor", "glmixer"]:
if gst_validate_checkout_element_present(compositor):
self.add_generators(GstValidateMixerTestsGenerator(compositor, self,
compositor,
"video",
converter="deinterlace ! videoconvert ! videorate ! videoscale ! video/x-raw,framerate=25/1,pixel-aspect-ratio=1/1",
valid_scenarios=valid_mixing_scenarios))
self.add_generators(
GstValidateMixerTestsGenerator(compositor, self,
compositor,
"video",
converter="deinterlace ! videoconvert ! videorate ! videoscale ! video/x-raw,framerate=25/1,pixel-aspect-ratio=1/1",
valid_scenarios=valid_mixing_scenarios))
def register_default_test_generators(self):
@ -51,20 +52,21 @@ def register_default_test_generators(self):
GstValidateTranscodingTestsGenerator(self)])
for compositor in ["compositor", "glvideomixer"]:
self.add_generators(GstValidateMixerTestsGenerator(compositor + ".simple", self,
compositor,
"video",
converter="deinterlace ! videoconvert",
mixed_srcs= {
"synchronized": {"mixer_props": "sink_1::alpha=0.5 sink_1::xpos=50 sink_1::ypos=50",
"sources":
("videotestsrc pattern=snow timestamp-offset=3000000000 ! 'video/x-raw,format=AYUV,width=640,height=480,framerate=(fraction)30/1' ! timeoverlay",
"videotestsrc pattern=smpte ! 'video/x-raw,format=AYUV,width=800,height=600,framerate=(fraction)10/1' ! timeoverlay")},
self.add_generators(
GstValidateMixerTestsGenerator(compositor + ".simple", self,
compositor,
"video",
converter="deinterlace ! videoconvert",
mixed_srcs={
"synchronized": {"mixer_props": "sink_1::alpha=0.5 sink_1::xpos=50 sink_1::ypos=50",
"sources":
("videotestsrc pattern=snow timestamp-offset=3000000000 ! 'video/x-raw,format=AYUV,width=640,height=480,framerate=(fraction)30/1' ! timeoverlay",
"videotestsrc pattern=smpte ! 'video/x-raw,format=AYUV,width=800,height=600,framerate=(fraction)10/1' ! timeoverlay")},
"bgra":
("videotestsrc ! video/x-raw, framerate=\(fraction\)10/1, width=100, height=100",
"videotestsrc ! video/x-raw, framerate=\(fraction\)5/1, width=320, height=240")
},
valid_scenarios=valid_mixing_scenarios))
},
valid_scenarios=valid_mixing_scenarios))
def register_default_scenarios(self):
@ -72,7 +74,7 @@ def register_default_scenarios(self):
Registers default test scenarios
"""
self.add_scenarios([
"play_15s",
"play_15s",
"reverse_playback",
"fast_forward",
"seek_forward",
@ -86,6 +88,7 @@ def register_default_scenarios(self):
"change_state_intensive",
"scrub_forward_seeking"])
def register_default_encoding_formats(self):
"""
Registers default encoding formats
@ -97,6 +100,7 @@ def register_default_encoding_formats(self):
MediaFormatCombination("mkv", "vorbis", "h264"),
])
def register_default_blacklist(self):
self.set_default_blacklist([
# hls known issues
@ -135,28 +139,34 @@ def register_default_blacklist(self):
'mpegts_base_loop (): ...: stream stopped, reason not-negotiated'),
# HTTP known issues:
("validate.http.*scrub_forward_seeking.*", "This is not stable enough for now."),
("validate.http.*scrub_forward_seeking.*",
"This is not stable enough for now."),
("validate.http.playback.change_state_intensive.raw_video_mov",
"This is not stable enough for now. (flow return from pad push doesn't match expected value)"),
# MXF known issues"
(".*reverse_playback.*mxf", "Reverse playback is not handled in MXF"),
("validate\.file\.transcode.*mxf", "FIXME: Transcoding and mixing tests need to be tested"),
("validate\.file\.transcode.*mxf",
"FIXME: Transcoding and mixing tests need to be tested"),
# Subtitles known issues
("validate.file.playback.switch_subtitle_track.Sintel_2010_720p_mkv", "https://bugzilla.gnome.org/show_bug.cgi?id=734051"),
("validate.file.playback.switch_subtitle_track.Sintel_2010_720p_mkv",
"https://bugzilla.gnome.org/show_bug.cgi?id=734051"),
# Videomixing known issues
("validate.file.*.simple.scrub_forward_seeking.synchronized", "https://bugzilla.gnome.org/show_bug.cgi?id=734060"),
("validate.file.*.simple.scrub_forward_seeking.synchronized",
"https://bugzilla.gnome.org/show_bug.cgi?id=734060"),
# FLAC known issues"
(".*reverse_playback.*flac", "Reverse playback is not handled in flac"),
(".*reverse_playback.*flac",
"Reverse playback is not handled in flac"),
# WMV known issues"
(".*reverse_playback.*wmv", "Reverse playback is not handled in wmv"),
(".*reverse_playback.*asf", "Reverse playback is not handled in asf"),
])
def register_defaults(self):
self.register_default_scenarios()
self.register_default_encoding_formats()

View file

@ -113,7 +113,6 @@ class Test(Loggable):
return value
def get_classname(self):
name = self.classname.split('.')[-1]
classname = self.classname.replace('.%s' % name, '')
@ -137,7 +136,7 @@ class Test(Loggable):
pname = subprocess.check_output(("readlink -e /proc/%s/exe"
% self.process.pid).split(' ')).replace('\n', '')
raw_input("%sTimeout happened you can attach gdb doing: $gdb %s %d%s\n"
"Press enter to continue" %(Colors.FAIL, pname, self.process.pid,
"Press enter to continue" % (Colors.FAIL, pname, self.process.pid,
Colors.ENDC))
self.result = result
@ -156,7 +155,7 @@ class Test(Loggable):
else:
self.set_result(Result.FAILED,
"Application returned %d" % (
self.process.returncode))
self.process.returncode))
def get_current_value(self):
"""
@ -184,7 +183,8 @@ class Test(Loggable):
self.debug("Got value: %s" % val)
if val is Result.NOT_RUN:
# The get_current_value logic is not implemented... dumb timeout
# The get_current_value logic is not implemented... dumb
# timeout
if time.time() - last_change_ts > self.timeout:
self.set_result(Result.TIMEOUT)
break
@ -198,12 +198,14 @@ class Test(Loggable):
if val == last_val:
delta = time.time() - last_change_ts
self.debug("%s: Same value for %d/%d seconds" % (self, delta, self.timeout))
self.debug("%s: Same value for %d/%d seconds" %
(self, delta, self.timeout))
if delta > self.timeout:
self.set_result(Result.TIMEOUT)
break
elif self.hard_timeout and time.time() - start_ts > self.hard_timeout:
self.set_result(Result.TIMEOUT, "Hard timeout reached: %d", self.hard_timeout)
self.set_result(
Result.TIMEOUT, "Hard timeout reached: %d", self.hard_timeout)
break
else:
last_change_ts = time.time()
@ -221,8 +223,8 @@ class Test(Loggable):
proc_env = self.get_subproc_env()
message = "Launching: %s%s\n" \
" Command: '%s %s'\n" %(Colors.ENDC, self.classname,
self._env_variable, self.command)
" Command: '%s %s'\n" % (Colors.ENDC, self.classname,
self._env_variable, self.command)
if not self.reporter.uses_standard_output():
message += " Logs:\n" \
" - %s" % (self.logfile)
@ -267,16 +269,19 @@ class Test(Loggable):
class GstValidateTest(Test):
""" A class representing a particular test. """
findpos_regex = re.compile('.*position.*(\d+):(\d+):(\d+).(\d+).*duration.*(\d+):(\d+):(\d+).(\d+)')
findlastseek_regex = re.compile('seeking to.*(\d+):(\d+):(\d+).(\d+).*stop.*(\d+):(\d+):(\d+).(\d+).*rate.*(\d+)\.(\d+)')
findpos_regex = re.compile(
'.*position.*(\d+):(\d+):(\d+).(\d+).*duration.*(\d+):(\d+):(\d+).(\d+)')
findlastseek_regex = re.compile(
'seeking to.*(\d+):(\d+):(\d+).(\d+).*stop.*(\d+):(\d+):(\d+).(\d+).*rate.*(\d+)\.(\d+)')
def __init__(self, application_name, classname,
options, reporter, duration=0,
timeout=DEFAULT_TIMEOUT, scenario=None, hard_timeout=None):
super(GstValidateTest, self).__init__(application_name, classname, options,
reporter, duration=duration,
timeout=timeout, hard_timeout=hard_timeout)
super(
GstValidateTest, self).__init__(application_name, classname, options,
reporter, duration=duration,
timeout=timeout, hard_timeout=hard_timeout)
# defines how much the process can be outside of the configured
# segment / seek
@ -290,9 +295,11 @@ class GstValidateTest(Test):
def get_subproc_env(self):
if self.reporter.uses_standard_output():
self.validatelogs = os.path.join (tempfile.gettempdir(), 'tmp.validate.logs')
self.validatelogs = os.path.join(
tempfile.gettempdir(), 'tmp.validate.logs')
logfiles = self.validatelogs
logfiles += os.pathsep + self.reporter.out.name.replace("<", '').replace(">", '')
logfiles += os.pathsep + \
self.reporter.out.name.replace("<", '').replace(">", '')
else:
self.validatelogs = self.logfile + '.validate.logs'
logfiles = self.validatelogs
@ -377,9 +384,10 @@ class GstValidateTest(Test):
else:
self.set_result(Result.FAILED,
"Application returned %s (issues: %s)" % (
self.process.returncode,
self.get_validate_criticals_errors()
self.process.returncode,
self.get_validate_criticals_errors()
))
def _parse_position(self, p):
self.log("Parsing %s" % p)
times = self.findpos_regex.findall(p)
@ -391,11 +399,9 @@ class GstValidateTest(Test):
return (utils.gsttime_from_tuple(times[0][:4]),
utils.gsttime_from_tuple(times[0][4:]))
def _parse_buffering(self, b):
return b.split("buffering... ")[1].split("%")[0], 100
def _get_position(self):
position = duration = -1
@ -436,7 +442,6 @@ class GstValidateTest(Test):
self.debug("Could not fine any seeking info")
return start, stop, rate
values = self.findlastseek_regex.findall(m)
if len(values) != 1:
self.warning("Got a unparsable value: %s" % p)
@ -495,8 +500,8 @@ class GstValidateEncodingTestInterface(object):
return size
def _get_profile_full(self, muxer, venc, aenc, video_restriction=None,
audio_restriction=None, audio_presence=0,
video_presence=0):
audio_restriction=None, audio_presence=0,
video_presence=0):
ret = "\""
if muxer:
ret += muxer
@ -553,13 +558,15 @@ class GstValidateEncodingTestInterface(object):
for tmptype in possible_mtypes:
possible_c_variant = c.replace(media_type, tmptype)
if possible_c_variant in ccaps:
self.info("Found %s in %s, good enough!", possible_c_variant)
self.info(
"Found %s in %s, good enough!", possible_c_variant)
has_variant = True
return has_variant
def check_encoded_file(self):
result_descriptor = GstValidateMediaDescriptor.new_from_uri(self.dest_file)
result_descriptor = GstValidateMediaDescriptor.new_from_uri(
self.dest_file)
duration = result_descriptor.get_duration()
orig_duration = self.media_descriptor.get_duration()
tolerance = self._duration_tolerance
@ -568,8 +575,8 @@ class GstValidateEncodingTestInterface(object):
os.remove(result_descriptor.get_path())
return (Result.FAILED, "Duration of encoded file is "
" wrong (%s instead of %s)" %
(utils.TIME_ARGS (duration),
utils.TIME_ARGS (orig_duration)))
(utils.TIME_ARGS(duration),
utils.TIME_ARGS(orig_duration)))
else:
all_tracks_caps = result_descriptor.get_tracks_caps()
container_caps = result_descriptor.get_caps()
@ -590,13 +597,12 @@ class GstValidateEncodingTestInterface(object):
for c in cwanted_caps:
if c not in ccaps:
if not self._has_caps_type_variant (c, ccaps):
if not self._has_caps_type_variant(c, ccaps):
os.remove(result_descriptor.get_path())
return (Result.FAILED,
"Field: %s (from %s) not in caps of the outputed file %s"
% (wanted_caps, c, ccaps))
os.remove(result_descriptor.get_path())
return (Result.PASSED, "")
@ -662,7 +668,6 @@ class TestsManager(Loggable):
printc(msg, Colors.FAIL, True)
def add_options(self, parser):
""" Add more arguments. """
pass
@ -701,7 +706,6 @@ class TestsManager(Loggable):
int(self.options.long_limit)))
return False
if not self.wanted_tests_patterns:
return True
@ -735,6 +739,7 @@ class TestsManager(Loggable):
class TestsGenerator(Loggable):
def __init__(self, name, test_manager, tests=[]):
Loggable.__init__(self)
self.name = name
@ -754,6 +759,7 @@ class TestsGenerator(Loggable):
class GstValidateTestsGenerator(TestsGenerator):
def populate_tests(self, uri_minfo_special_scenarios, scenarios):
pass
@ -763,6 +769,7 @@ class GstValidateTestsGenerator(TestsGenerator):
class _TestsLauncher(Loggable):
def __init__(self, libsdir):
Loggable.__init__(self)
@ -775,22 +782,22 @@ class _TestsLauncher(Loggable):
self._list_testers()
self.wanted_tests_patterns = []
def _list_app_dirs (self):
def _list_app_dirs(self):
app_dirs = []
app_dirs.append (os.path.join(self.libsdir, "apps"))
app_dirs.append(os.path.join(self.libsdir, "apps"))
env_dirs = os.environ.get("GST_VALIDATE_APPS_DIR")
if env_dirs is not None:
for dir_ in env_dirs.split(":"):
app_dirs.append (dir_)
app_dirs.append(dir_)
return app_dirs
def _exec_app (self, app_dir, env):
def _exec_app(self, app_dir, env):
for f in os.listdir(app_dir):
if f.endswith(".py"):
execfile(os.path.join(app_dir, f), env)
def _exec_apps (self, env):
def _exec_apps(self, env):
app_dirs = self._list_app_dirs()
for app_dir in app_dirs:
self._exec_app(app_dir, env)
@ -842,7 +849,6 @@ class _TestsLauncher(Loggable):
for tester in self.testers:
tester.set_settings(options, args, self.reporter)
def list_tests(self):
for tester in self.testers:
self.tests.extend(tester.list_tests())
@ -858,7 +864,7 @@ class _TestsLauncher(Loggable):
res = tester.run_tests(cur_test_num, total_num_tests)
cur_test_num += len(tester.list_tests())
if res != Result.PASSED and (self.options.forever or
self.options.fatal_error):
self.options.fatal_error):
return False
return True
@ -892,7 +898,9 @@ class NamedDic(object):
for name, value in props.iteritems():
setattr(self, name, value)
class Scenario(object):
def __init__(self, name, props, path=None):
self.name = name
self.path = path
@ -955,7 +963,7 @@ class ScenarioManager(Loggable):
def __new__(cls, *args, **kwargs):
if not cls._instance:
cls._instance = super(ScenarioManager, cls).__new__(
cls, *args, **kwargs)
cls, *args, **kwargs)
cls._instance.config = None
cls._instance.discovered = False
Loggable.__init__(cls._instance)
@ -973,7 +981,6 @@ class ScenarioManager(Loggable):
if scenarios:
scenarios = self.discover_scenarios(scenarios, mfile)
return scenarios
def discover_scenarios(self, scenario_paths=[], mfile=None):
@ -986,10 +993,12 @@ class ScenarioManager(Loggable):
if self.config.logsdir in ["stdout", "stderr"]:
logs = open(os.devnull)
else:
logs = open(os.path.join(self.config.logsdir, "scenarios_discovery.log"), 'w')
logs = open(
os.path.join(self.config.logsdir, "scenarios_discovery.log"), 'w')
try:
command = [self.GST_VALIDATE_COMMAND, "--scenarios-defs-output-file", scenario_defs]
command = [self.GST_VALIDATE_COMMAND,
"--scenarios-defs-output-file", scenario_defs]
command.extend(scenario_paths)
subprocess.check_call(command, stdout=logs, stderr=logs)
except subprocess.CalledProcessError:
@ -1005,7 +1014,8 @@ class ScenarioManager(Loggable):
if section in scenario_path:
# The real name of the scenario is:
# filename.REALNAME.scenario
name = scenario_path.replace(mfile + ".", "").replace("." + self.FILE_EXTENDION, "")
name = scenario_path.replace(mfile + ".", "").replace(
"." + self.FILE_EXTENDION, "")
path = scenario_path
else:
name = section
@ -1056,7 +1066,6 @@ class GstValidateBaseTestManager(TestsManager):
def get_scenarios(self):
return self._scenarios
def add_encoding_formats(self, encoding_formats):
"""
:param encoding_formats: A list or one single #MediaFormatCombinations describing wanted output
@ -1075,6 +1084,7 @@ class GstValidateBaseTestManager(TestsManager):
class MediaDescriptor(Loggable):
def __init__(self):
Loggable.__init__(self)
@ -1117,8 +1127,10 @@ class MediaDescriptor(Loggable):
return False
if self.get_duration() / GST_SECOND < scenario.get_min_media_duration():
self.debug("Do not run %s as %s is too short (%i < min media duation : %i",
scenario, self.get_uri(), self.get_duration() / GST_SECOND,
self.debug(
"Do not run %s as %s is too short (%i < min media duation : %i",
scenario, self.get_uri(
), self.get_duration() / GST_SECOND,
scenario.get_min_media_duration())
return False
@ -1133,8 +1145,6 @@ class MediaDescriptor(Loggable):
return True
class GstValidateMediaDescriptor(MediaDescriptor):
# Some extension file for discovering results
MEDIA_INFO_EXT = "media_info"
@ -1157,7 +1167,8 @@ class GstValidateMediaDescriptor(MediaDescriptor):
@staticmethod
def new_from_uri(uri, verbose=False, full=False):
media_path = utils.url2path(uri)
descriptor_path = "%s.%s" % (media_path, GstValidateMediaDescriptor.MEDIA_INFO_EXT)
descriptor_path = "%s.%s" % (
media_path, GstValidateMediaDescriptor.MEDIA_INFO_EXT)
args = GstValidateMediaDescriptor.DISCOVERER_COMMAND.split(" ")
args.append(uri)
@ -1167,7 +1178,7 @@ class GstValidateMediaDescriptor(MediaDescriptor):
if verbose:
printc("Generating media info for %s\n"
" Command: '%s'" % (media_path, ' '.join(args)),
" Command: '%s'" % (media_path, ' '.join(args)),
Colors.OKBLUE)
try:
@ -1243,6 +1254,7 @@ class GstValidateMediaDescriptor(MediaDescriptor):
return name.replace('.', "_")
class MediaFormatCombination(object):
FORMATS = {"aac": "audio/mpeg,mpegversion=4",
"ac3": "audio/x-ac3",
@ -1256,7 +1268,6 @@ class MediaFormatCombination(object):
"mp4": "video/quicktime,variant=iso;",
"webm": "video/webm"}
def __str__(self):
return "%s and %s in %s" % (self.audio, self.video, self.container)

View file

@ -28,7 +28,9 @@ logcat = "httpserver"
class HTTPServer(loggable.Loggable):
""" Class to run a SimpleHttpServer in a process."""
def __init__(self, options):
loggable.Loggable.__init__(self)
self.options = options
@ -60,8 +62,8 @@ class HTTPServer(loggable.Loggable):
self._logsfile = tempfile.TemporaryFile()
else:
self._logsfile = open(os.path.join(self.options.logsdir,
"httpserver.logs"),
'w+')
"httpserver.logs"),
'w+')
if self.options.http_server_dir is not None:
if self._check_is_up(timeout=2):
return True
@ -70,15 +72,16 @@ class HTTPServer(loggable.Loggable):
try:
self.debug("Lunching http server")
cmd = "%s %s %d %s" % (sys.executable, os.path.join(os.path.dirname(__file__),
"RangeHTTPServer.py"),
self.options.http_server_port,
self.options.http_bandwith,
)
"RangeHTTPServer.py"),
self.options.http_server_port,
self.options.http_bandwith,
)
curdir = os.path.abspath(os.curdir)
os.chdir(self.options.http_server_dir)
#cmd = "twistd -no web --path=%s -p %d" % (
# self.options.http_server_dir, self.options.http_server_port)
self.debug("Lunching server: %s (logs in %s)", cmd, self._logsfile)
# cmd = "twistd -no web --path=%s -p %d" % (
# self.options.http_server_dir, self.options.http_server_port)
self.debug(
"Lunching server: %s (logs in %s)", cmd, self._logsfile)
self._process = subprocess.Popen(cmd.split(" "),
stderr=self._logsfile,
stdout=self._logsfile)

View file

@ -69,6 +69,7 @@ _LEVEL_NAMES = ['ERROR', 'WARN', 'FIXME', 'INFO', 'DEBUG', 'LOG']
class TerminalController(object):
"""
A class that can be used to portably generate formatted output to
a terminal.
@ -197,7 +198,8 @@ class TerminalController(object):
if set_bg_ansi:
for i, color in zip(range(len(self._ANSICOLORS)),
self._ANSICOLORS):
setattr(self, 'BG_' + color, curses.tparm(set_bg_ansi, i) or '')
setattr(
self, 'BG_' + color, curses.tparm(set_bg_ansi, i) or '')
def _tigetstr(self, cap_name):
# String capabilities can include "delays" of the form "$<2>".
@ -222,12 +224,13 @@ class TerminalController(object):
else:
return getattr(self, s[2:-1])
#######################################################################
#
# Example use case: progress bar
#######################################################################
#
class ProgressBar:
"""
A 3-line progress bar, which looks like::
@ -643,16 +646,18 @@ def stderrHandler(level, object, category, file, line, message):
# show a bazillion of debug details that are not relevant to Pitivi.
if not _enableCrackOutput:
safeprintf(sys.stderr, '%s %-8s %-17s %-2s %s %s\n',
getFormattedLevelName(level), time.strftime("%H:%M:%S"),
category, "", message, where)
getFormattedLevelName(level), time.strftime("%H:%M:%S"),
category, "", message, where)
else:
o = ""
if object:
o = '"' + object + '"'
# level pid object cat time
# 5 + 1 + 7 + 1 + 32 + 1 + 17 + 1 + 15 == 80
safeprintf(sys.stderr, '%s [%5d] [0x%12x] %-32s %-17s %-15s %-4s %s %s\n',
getFormattedLevelName(level), os.getpid(), thread.get_ident(),
safeprintf(
sys.stderr, '%s [%5d] [0x%12x] %-32s %-17s %-15s %-4s %s %s\n',
getFormattedLevelName(
level), os.getpid(), thread.get_ident(),
o[:32], category, time.strftime("%b %d %H:%M:%S"), "",
message, where)
sys.stderr.flush()
@ -667,14 +672,14 @@ def _preformatLevels(noColorEnvVarName):
t = TerminalController()
formatter = lambda level: ''.join((t.BOLD, getattr(t, COLORS[level]),
format % (_LEVEL_NAMES[level - 1], ), t.NORMAL))
format % (_LEVEL_NAMES[level - 1], ), t.NORMAL))
else:
formatter = lambda level: format % (_LEVEL_NAMES[level - 1], )
for level in ERROR, WARN, FIXME, INFO, DEBUG, LOG:
_FORMATTED_LEVELS.append(formatter(level))
### "public" useful API
# "public" useful API
# setup functions
@ -934,6 +939,7 @@ def outputToFiles(stdout=None, stderr=None):
class BaseLoggable(object):
"""
Base class for objects that want to be able to log messages with
different level of severity. The levels are, in order from least
@ -966,37 +972,43 @@ class BaseLoggable(object):
"""Log an error. By default this will also raise an exception."""
if _canShortcutLogging(self.logCategory, ERROR):
return
errorObject(self.logObjectName(), self.logCategory, *self.logFunction(*args))
errorObject(self.logObjectName(),
self.logCategory, *self.logFunction(*args))
def warning(self, *args):
"""Log a warning. Used for non-fatal problems."""
if _canShortcutLogging(self.logCategory, WARN):
return
warningObject(self.logObjectName(), self.logCategory, *self.logFunction(*args))
warningObject(
self.logObjectName(), self.logCategory, *self.logFunction(*args))
def fixme(self, *args):
"""Log a fixme. Used for FIXMEs ."""
if _canShortcutLogging(self.logCategory, FIXME):
return
fixmeObject(self.logObjectName(), self.logCategory, *self.logFunction(*args))
fixmeObject(self.logObjectName(),
self.logCategory, *self.logFunction(*args))
def info(self, *args):
"""Log an informational message. Used for normal operation."""
if _canShortcutLogging(self.logCategory, INFO):
return
infoObject(self.logObjectName(), self.logCategory, *self.logFunction(*args))
infoObject(self.logObjectName(),
self.logCategory, *self.logFunction(*args))
def debug(self, *args):
"""Log a debug message. Used for debugging."""
if _canShortcutLogging(self.logCategory, DEBUG):
return
debugObject(self.logObjectName(), self.logCategory, *self.logFunction(*args))
debugObject(self.logObjectName(),
self.logCategory, *self.logFunction(*args))
def log(self, *args):
"""Log a log message. Used for debugging recurring events."""
if _canShortcutLogging(self.logCategory, LOG):
return
logObject(self.logObjectName(), self.logCategory, *self.logFunction(*args))
logObject(self.logObjectName(),
self.logCategory, *self.logFunction(*args))
def doLog(self, level, where, format, *args, **kwargs):
"""
@ -1020,7 +1032,7 @@ class BaseLoggable(object):
return {}
args = self.logFunction(*args)
return doLog(level, self.logObjectName(), self.logCategory,
format, args, where=where, **kwargs)
format, args, where=where, **kwargs)
def warningFailure(self, failure, swallow=True):
"""
@ -1035,7 +1047,7 @@ class BaseLoggable(object):
return
return failure
warningObject(self.logObjectName(), self.logCategory,
*self.logFunction(getFailureMessage(failure)))
*self.logFunction(getFailureMessage(failure)))
if not swallow:
return failure
@ -1141,6 +1153,7 @@ def logTwisted():
class TwistedLogObserver(BaseLoggable):
"""
Twisted log observer that integrates with our logging.
"""
@ -1196,6 +1209,7 @@ class TwistedLogObserver(BaseLoggable):
class Loggable(BaseLoggable):
def __init__(self, logCategory=None):
if logCategory:
self.logCategory = logCategory
@ -1212,4 +1226,4 @@ class Loggable(BaseLoggable):
if _canShortcutLogging(self.logCategory, ERROR):
return
doLog(ERROR, self.logObjectName(), self.logCategory,
format, self.logFunction(*args), where=-2)
format, self.logFunction(*args), where=-2)

View file

@ -133,6 +133,7 @@ QA_ASSETS = "gst-qa-assets"
MEDIAS_FOLDER = "medias"
DEFAULT_GST_QA_ASSETS_REPO = "git://people.freedesktop.org/~tsaunier/gst-qa-assets/"
def update_assets(options):
try:
launch_command("cd %s && %s" % (options.clone_dir,
@ -144,7 +145,7 @@ def update_assets(options):
else:
m = ""
printc("Could not update assets repository\n\nError: %s%s" %(e, m),
printc("Could not update assets repository\n\nError: %s%s" % (e, m),
Colors.FAIL, True)
return False
@ -164,70 +165,76 @@ def download_assets(options):
else:
m = ""
printc("Could not download assets\n\nError: %s%s" %(e, m),
printc("Could not download assets\n\nError: %s%s" % (e, m),
Colors.FAIL, True)
return False
return True
class PrintUsage(argparse.Action):
def __init__(self, option_strings, dest=argparse.SUPPRESS, default=argparse.SUPPRESS, help=None):
super(PrintUsage, self).__init__(option_strings=option_strings, dest=dest,
default=default, nargs=0, help=help)
super(
PrintUsage, self).__init__(option_strings=option_strings, dest=dest,
default=default, nargs=0, help=help)
def __call__(self, parser, namespace, values, option_string=None):
print(HELP)
parser.exit()
def main(libsdir):
parser = argparse.ArgumentParser(formatter_class=argparse.RawTextHelpFormatter, prog='gst-validate-launcher',
parser = argparse.ArgumentParser(
formatter_class=argparse.RawTextHelpFormatter, prog='gst-validate-launcher',
description=HELP)
parser.add_argument("-d", "--debug", dest="debug",
action="store_true",
default=False,
help="Let user debug the process on timeout")
action="store_true",
default=False,
help="Let user debug the process on timeout")
parser.add_argument("-f", "--forever", dest="forever",
action="store_true", default=False,
help="Keep running tests until one fails")
action="store_true", default=False,
help="Keep running tests until one fails")
parser.add_argument("-F", "--fatal-error", dest="fatal_error",
action="store_true", default=False,
help="Stop on first fail")
action="store_true", default=False,
help="Stop on first fail")
parser.add_argument("-t", "--wanted-tests", dest="wanted_tests",
default=[],
action="append",
help="Define the tests to execute, it can be a regex"
" if it contains defaults_only, only default scenarios"
" will be executed")
default=[],
action="append",
help="Define the tests to execute, it can be a regex"
" if it contains defaults_only, only default scenarios"
" will be executed")
parser.add_argument("-b", "--blacklisted-tests", dest="blacklisted_tests",
default=[],
action="append",
help="Define the tests not to execute, it can be a regex.")
default=[],
action="append",
help="Define the tests not to execute, it can be a regex.")
parser.add_argument("-L", "--list-tests",
dest="list_tests",
action="store_true",
default=False,
help="List tests and exit")
dest="list_tests",
action="store_true",
default=False,
help="List tests and exit")
parser.add_argument("-m", "--mute", dest="mute",
action="store_true", default=False,
help="Mute playback output, which mean that we use "
"a fakesink")
action="store_true", default=False,
help="Mute playback output, which mean that we use "
"a fakesink")
parser.add_argument("-n", "--no-color", dest="no_color",
action="store_true", default=False,
help="Set it to output no colored text in the terminal")
action="store_true", default=False,
help="Set it to output no colored text in the terminal")
parser.add_argument("-g", "--generate-media-info", dest="generate_info",
action="store_true", default=False,
help="Set it in order to generate the missing .media_infos files")
parser.add_argument("-G", "--generate-media-info-with-frame-detection", dest="generate_info_full",
action="store_true", default=False,
help="Set it in order to generate the missing .media_infos files")
parser.add_argument(
"-G", "--generate-media-info-with-frame-detection", dest="generate_info_full",
action="store_true", default=False,
help="Set it in order to generate the missing .media_infos files"
"It implies --generate-media-info but enabling frame detection")
parser.add_argument("-lt", "--long-test-limit", dest="long_limit",
default=utils.LONG_TEST, action='store',
help="Defines the limite from which a test is concidered as long (in seconds)"),
default=utils.LONG_TEST, action='store',
help="Defines the limite from which a test is concidered as long (in seconds)"),
parser.add_argument("-c", "--config", dest="config",
default=None,
help="""Lets you specify a file where the testsuite to execute is defined.
default=None,
help="""Lets you specify a file where the testsuite to execute is defined.
In this file you will have acces to the TestManager objects that you can configure with
its various methods, for example you can find the 'validate' variable in case the GstValidateManager
launcher is avalaible. You should configure it using:
@ -247,61 +254,69 @@ You can also set default values with:
Note: In the config file, you have acces to the options variable resulting from the parsing of the command line
user argument, you can thus overrides command line options using that.
""")
dir_group = parser.add_argument_group("Directories and files to be used by the launcher")
dir_group = parser.add_argument_group(
"Directories and files to be used by the launcher")
parser.add_argument('--xunit-file', action='store',
dest='xunit_file', metavar="FILE",
default=None,
help=("Path to xml file to store the xunit report in. "
"Default is LOGSDIR/xunit.xml"))
dest='xunit_file', metavar="FILE",
default=None,
help=("Path to xml file to store the xunit report in. "
"Default is LOGSDIR/xunit.xml"))
dir_group.add_argument("-M", "--main-dir", dest="main_dir",
default=DEFAULT_MAIN_DIR,
help="Main directory where to put files. Default is %s" % DEFAULT_MAIN_DIR)
default=DEFAULT_MAIN_DIR,
help="Main directory where to put files. Default is %s" % DEFAULT_MAIN_DIR)
dir_group.add_argument("-o", "--output-dir", dest="output_dir",
default=None,
help="Directory where to store logs and rendered files. Default is MAIN_DIR")
default=None,
help="Directory where to store logs and rendered files. Default is MAIN_DIR")
dir_group.add_argument("-l", "--logs-dir", dest="logsdir",
default=None,
help="Directory where to store logs, default is OUTPUT_DIR/logs."
default=None,
help="Directory where to store logs, default is OUTPUT_DIR/logs."
" Note that 'stdout' and 'sdterr' are valid values that lets you get all the logs"
" printed in the terminal")
dir_group.add_argument("-R", "--render-path", dest="dest",
default=None,
help="Set the path to which projects should be rendered, default is OUTPUT_DIR/rendered")
dir_group.add_argument("-p", "--medias-paths", dest="paths", action="append",
default=None,
help="Set the path to which projects should be rendered, default is OUTPUT_DIR/rendered")
dir_group.add_argument(
"-p", "--medias-paths", dest="paths", action="append",
default=None,
help="Paths in which to look for media files, default is MAIN_DIR/gst-qa-assets/media")
dir_group.add_argument("-a", "--clone-dir", dest="clone_dir",
default=None,
help="Paths in which to look for media files, default is MAIN_DIR/gst-qa-assets")
default=None,
help="Paths in which to look for media files, default is MAIN_DIR/gst-qa-assets")
http_server_group = parser.add_argument_group("Handle the HTTP server to be created")
http_server_group.add_argument("--http-server-port", dest="http_server_port",
http_server_group = parser.add_argument_group(
"Handle the HTTP server to be created")
http_server_group.add_argument(
"--http-server-port", dest="http_server_port",
default=8079,
help="Port on which to run the http server on localhost")
http_server_group.add_argument("--http-bandwith-limitation", dest="http_bandwith",
http_server_group.add_argument(
"--http-bandwith-limitation", dest="http_bandwith",
default=1024 * 1024,
help="The artificial bandwith limitation to introduce to the local server (in Bytes/sec) (default: 1 MBps)")
http_server_group.add_argument("-s", "--folder-for-http-server", dest="http_server_dir",
http_server_group.add_argument(
"-s", "--folder-for-http-server", dest="http_server_dir",
default=None,
help="Folder in which to create an http server on localhost. Default is PATHS")
http_server_group.add_argument("--http-only", dest="httponly",
default=False, action='store_true',
help="Start the http server and quit")
default=False, action='store_true',
help="Start the http server and quit")
assets_group = parser.add_argument_group("Handle remote assets")
assets_group.add_argument("-u", "--update-assets-command", dest="update_assets_command",
assets_group.add_argument(
"-u", "--update-assets-command", dest="update_assets_command",
default="git fetch origin && git checkout origin/master && git annex get .",
help="Command to update assets")
assets_group.add_argument("--get-assets-command", dest="get_assets_command",
assets_group.add_argument(
"--get-assets-command", dest="get_assets_command",
default="git clone",
help="Command to get assets")
assets_group.add_argument("--remote-assets-url", dest="remote_assets_url",
default=DEFAULT_GST_QA_ASSETS_REPO,
help="Url to the remote assets (default:%s)" % DEFAULT_GST_QA_ASSETS_REPO)
default=DEFAULT_GST_QA_ASSETS_REPO,
help="Url to the remote assets (default:%s)" % DEFAULT_GST_QA_ASSETS_REPO)
assets_group.add_argument("-S", "--sync", dest="sync", action="store_true",
default=False, help="Synchronize asset repository")
default=False, help="Synchronize asset repository")
assets_group.add_argument("--usage", dest="sync", action=PrintUsage,
help="Print usage documentation")
help="Print usage documentation")
loggable.init("GST_VALIDATE_LAUNCHER_DEBUG", True, False)
@ -342,7 +357,6 @@ user argument, you can thus overrides command line options using that.
if options.generate_info_full is True:
options.generate_info = True
if options.http_server_dir is None:
if isinstance(options.paths, list):
options.http_server_dir = options.paths[0]
@ -377,7 +391,7 @@ user argument, you can thus overrides command line options using that.
for test in l:
printc(test)
printc("\nNumber of tests: %d" % len (l), Colors.OKGREEN)
printc("\nNumber of tests: %d" % len(l), Colors.OKGREEN)
return 0
httpsrv = HTTPServer(options)

View file

@ -125,13 +125,17 @@ class Reporter(Loggable):
print "\n"
lenstat = (len("Statistics") + 1)
printc("Statistics:\n%s" %(lenstat * "-"), Colors.OKBLUE)
printc("Statistics:\n%s" % (lenstat * "-"), Colors.OKBLUE)
printc("\n%sTotal time spent: %s seconds\n" %
((lenstat * " "), datetime.timedelta(seconds=(time.time() - self._start_time))),
((lenstat * " "), datetime.timedelta(
seconds=(time.time() - self._start_time))),
Colors.OKBLUE)
printc("%sPassed: %d" % (lenstat * " ", self.stats["passed"]), Colors.OKGREEN)
printc("%sFailed: %d" % (lenstat * " ", self.stats["failures"]), Colors.FAIL)
printc("%s%s" %(lenstat * " ", (len("Failed: 0")) * "-"), Colors.OKBLUE)
printc("%sPassed: %d" %
(lenstat * " ", self.stats["passed"]), Colors.OKGREEN)
printc("%sFailed: %d" %
(lenstat * " ", self.stats["failures"]), Colors.FAIL)
printc("%s%s" %
(lenstat * " ", (len("Failed: 0")) * "-"), Colors.OKBLUE)
total = self.stats["failures"] + self.stats["passed"]
color = Colors.WARNING
@ -144,6 +148,7 @@ class Reporter(Loggable):
class XunitReporter(Reporter):
"""This reporter provides test results in the standard XUnit XML format."""
name = 'xunit'
encoding = 'UTF-8'
@ -167,7 +172,8 @@ class XunitReporter(Reporter):
escape_cdata(value)
for extralog in self._current_test.extra_logfiles:
captured += "\n\n===== %s =====\n\n" % escape_cdata(os.path.basename(extralog))
captured += "\n\n===== %s =====\n\n" % escape_cdata(
os.path.basename(extralog))
value = self._current_test.get_extra_log_content(extralog)
captured += escape_cdata(value)
@ -194,10 +200,10 @@ class XunitReporter(Reporter):
self.stats['encoding'] = self.encoding
self.stats['total'] = (self.stats['timeout'] + self.stats['failures']
+ self.stats['passed'] + self.stats['skipped'])
self.xml_file.write( u'<?xml version="1.0" encoding="%(encoding)s"?>'
u'<testsuite name="gst-validate-launcher" tests="%(total)d" '
u'errors="%(timeout)d" failures="%(failures)d" '
u'skip="%(skipped)d">' % self.stats)
self.xml_file.write(u'<?xml version="1.0" encoding="%(encoding)s"?>'
u'<testsuite name="gst-validate-launcher" tests="%(total)d" '
u'errors="%(timeout)d" failures="%(failures)d" '
u'skip="%(skipped)d">' % self.stats)
self.xml_file.write(u''.join([self._forceUnicode(e)
for e in self.errorlist]))
self.xml_file.write(u'</testsuite>')

View file

@ -32,11 +32,12 @@ from operator import itemgetter
GST_SECOND = long(1000000000)
DEFAULT_TIMEOUT = 30
DEFAULT_MAIN_DIR = os.path.expanduser("~/gst-validate/")
DEFAULT_GST_QA_ASSETS = os.path.join(DEFAULT_MAIN_DIR, "gst-qa-assets")
DEFAULT_GST_QA_ASSETS = os.path.join(DEFAULT_MAIN_DIR, "gst-qa-assets")
DISCOVERER_COMMAND = "gst-discoverer-1.0"
# Use to set the duration from which a test is concidered as being 'long'
LONG_TEST = 40
class Result(object):
NOT_RUN = "Not run"
FAILED = "Failed"
@ -139,21 +140,23 @@ def url2path(url):
path = urlparse.urlparse(url).path
if "win32" in sys.platform:
if path[0] == '/':
return path[1:] # We need to remove the first '/' on windows
return path[1:] # We need to remove the first '/' on windows
return path
def isuri(string):
url = urlparse.urlparse(string)
if url.scheme != "" and url.scheme != "":
if url.scheme != "" and url.scheme != "":
return True
return False
def touch(fname, times=None):
with open(fname, 'a'):
os.utime(fname, times)
def get_subclasses(klass, env):
subclasses = []
for symb in env.iteritems():
@ -165,22 +168,29 @@ def get_subclasses(klass, env):
return subclasses
def TIME_ARGS(time):
return "%u:%02u:%02u.%09u" % (time / (GST_SECOND * 60 * 60),
(time / (GST_SECOND * 60)) % 60,
(time / GST_SECOND) % 60,
time % GST_SECOND)
##################################################
# Some utilities to parse gst-validate output #
##################################################
#
# Some utilities to parse gst-validate output #
#
def gsttime_from_tuple(stime):
return long((int(stime[0]) * 3600 + int(stime[1]) * 60 + int(stime[2])) * GST_SECOND + int(stime[3]))
return long((int(stime[0]) * 3600 + int(stime[1]) * 60 + int(stime[2])) * GST_SECOND + int(stime[3]))
timeregex = re.compile(r'(?P<_0>.+):(?P<_1>.+):(?P<_2>.+)\.(?P<_3>.+)')
def parse_gsttimeargs(time):
stime = map(itemgetter(1), sorted(timeregex.match(time).groupdict().items()))
return long((int(stime[0]) * 3600 + int(stime[1]) * 60 + int(stime[2])) * GST_SECOND + int(stime[3]))
stime = map(itemgetter(1), sorted(
timeregex.match(time).groupdict().items()))
return long((int(stime[0]) * 3600 + int(stime[1]) * 60 + int(stime[2])) * GST_SECOND + int(stime[3]))
def get_duration(media_file):
@ -189,7 +199,8 @@ def get_duration(media_file):
try:
res = subprocess.check_output([DISCOVERER_COMMAND, media_file])
except subprocess.CalledProcessError:
# gst-media-check returns !0 if seeking is not possible, we do not care in that case.
# gst-media-check returns !0 if seeking is not possible, we do not care
# in that case.
pass
for l in res.split('\n'):