validate: tools: Refactor and add a GstValidateTranscodeTest class

This commit is contained in:
Thibault Saunier 2014-01-09 09:14:27 +01:00
parent e591882794
commit dac5b38a4d
5 changed files with 373 additions and 245 deletions

View file

@ -18,88 +18,32 @@
# Boston, MA 02110-1301, USA. # Boston, MA 02110-1301, USA.
import os import os
import time import urlparse
from urllib import unquote from urllib import unquote
from urlparse import urlsplit
from gi.repository import GES, Gst, GLib from gi.repository import GES, Gst, GLib
from testdefinitions import Test, DEFAULT_QA_SAMPLE_PATH, TestsManager from testdefinitions import GstValidateTest, DEFAULT_GST_QA_ASSETS, TestsManager
from utils import MediaFormatCombination, get_profile, Result, get_current_position, get_current_size
DURATION_TOLERANCE = Gst.SECOND / 2 DURATION_TOLERANCE = Gst.SECOND / 2
DEFAULT_GES_LAUNCH = "ges-launch-1.0" DEFAULT_GES_LAUNCH = "ges-launch-1.0"
class Combination(object):
def __str__(self):
return "%s and %s in %s" % (self.acodec, self.vcodec, self.container)
def __init__(self, container, acodec, vcodec):
self.container = container
self.acodec = acodec
self.vcodec = vcodec
FORMATS = {"aac": "audio/mpeg,mpegversion=4",
"ac3": "audio/x-ac3",
"vorbis": "audio/x-vorbis",
"mp3": "audio/mpeg,mpegversion=1,layer=3",
"h264": "video/x-h264",
"vp8": "video/x-vp8",
"theora": "video/x-theora",
"ogg": "application/ogg",
"mkv": "video/x-matroska",
"mp4": "video/quicktime,variant=iso;",
"webm": "video/x-matroska"}
COMBINATIONS = [ COMBINATIONS = [
Combination("ogg", "vorbis", "theora"), MediaFormatCombination("ogg", "vorbis", "theora"),
Combination("webm", "vorbis", "vp8"), MediaFormatCombination("webm", "vorbis", "vp8"),
Combination("mp4", "mp3", "h264"), MediaFormatCombination("mp4", "mp3", "h264"),
Combination("mkv", "vorbis", "h264")] MediaFormatCombination("mkv", "vorbis", "h264")]
SCENARIOS = ["none", "seek_forward", "seek_backward", "scrub_forward_seeking"] SCENARIOS = ["none", "seek_forward", "seek_backward", "scrub_forward_seeking"]
def get_profile_full(muxer, venc, aenc, video_restriction=None,
audio_restriction=None,
audio_presence=0, video_presence=0):
ret = "\""
if muxer:
ret += muxer
ret += ":"
if venc:
if video_restriction is not None:
ret = ret + video_restriction + '->'
ret += venc
if video_presence:
ret = ret + '|' + str(video_presence)
if aenc:
ret += ":"
if audio_restriction is not None:
ret = ret + audio_restriction + '->'
ret += aenc
if audio_presence:
ret = ret + '|' + str(audio_presence)
ret += "\""
return ret.replace("::", ":")
def get_profile(combination):
return get_profile_full(FORMATS[combination.container],
FORMATS[combination.vcodec],
FORMATS[combination.acodec],
video_restriction="video/x-raw,format=I420")
def quote_uri(uri): def quote_uri(uri):
""" """
Encode a URI/path according to RFC 2396, without touching the file:/// part. Encode a URI/path according to RFC 2396, without touching the file:/// part.
""" """
# Split off the "file:///" part, if present. # Split off the "file:///" part, if present.
parts = urlsplit(uri, allow_fragments=False) parts = urlparse.urlsplit(uri, allow_fragments=False)
# Make absolutely sure the string is unquoted before quoting again! # Make absolutely sure the string is unquoted before quoting again!
raw_path = unquote(parts.path) raw_path = unquote(parts.path)
# For computing thumbnail md5 hashes in the media library, we must adhere to # For computing thumbnail md5 hashes in the media library, we must adhere to
@ -107,13 +51,12 @@ def quote_uri(uri):
return Gst.filename_to_uri(raw_path) return Gst.filename_to_uri(raw_path)
class GESTest(Test): class GESTest(GstValidateTest):
def __init__(self, classname, options, reporter, project_uri, scenario, def __init__(self, classname, options, reporter, project_uri, scenario=None,
combination=None): combination=None):
super(GESTest, self).__init__(DEFAULT_GES_LAUNCH, classname, options, reporter, super(GESTest, self).__init__(DEFAULT_GES_LAUNCH, classname, options, reporter,
scenario) scenario=scenario)
self.project_uri = project_uri self.project_uri = project_uri
self.combination = combination
proj = GES.Project.new(project_uri) proj = GES.Project.new(project_uri)
tl = proj.extract() tl = proj.extract()
if tl is None: if tl is None:
@ -125,18 +68,6 @@ class GESTest(Test):
else: else:
self.duration = 2 * 60 self.duration = 2 * 60
def set_rendering_info(self):
self.dest_file = os.path.join(self.options.dest,
os.path.basename(self.project_uri) +
'-' + self.combination.acodec +
self.combination.vcodec + '.' +
self.combination.container)
if not Gst.uri_is_valid(self.dest_file):
self.dest_file = GLib.filename_to_uri(self.dest_file, None)
profile = get_profile(self.combination)
self.add_arguments("-f", profile, "-o", self.dest_file)
def set_sample_paths(self): def set_sample_paths(self):
if not self.options.paths: if not self.options.paths:
if not self.options.recurse_paths: if not self.options.recurse_paths:
@ -160,9 +91,7 @@ class GESTest(Test):
self.add_arguments("--sample-paths", "file://" + path) self.add_arguments("--sample-paths", "file://" + path)
def build_arguments(self): def build_arguments(self):
Test.build_arguments(self) GstValidateTest.build_arguments(self)
if self.combination is not None:
self.set_rendering_info()
if self.options.mute: if self.options.mute:
self.add_arguments(" --mute") self.add_arguments(" --mute")
@ -170,9 +99,40 @@ class GESTest(Test):
self.set_sample_paths() self.set_sample_paths()
self.add_arguments("-l", self.project_uri) self.add_arguments("-l", self.project_uri)
class GESPlaybackTest(GESTest):
def __init__(self, classname, options, reporter, project_uri, scenario):
super(GESPlaybackTest, self).__init__(classname, options, reporter,
project_uri, scenario=scenario)
def get_current_value(self):
return get_current_position(self)
class GESRenderTest(GESTest):
def __init__(self, classname, options, reporter, project_uri, combination):
super(GESRenderTest, self).__init__(classname, options, reporter,
project_uri)
self.combination = combination
def build_arguments(self):
GESTest.build_arguments(self)
self._set_rendering_info()
def _set_rendering_info(self):
self.dest_file = os.path.join(self.options.dest,
os.path.basename(self.project_uri) +
'-' + self.combination.acodec +
self.combination.vcodec + '.' +
self.combination.container)
if not Gst.uri_is_valid(self.dest_file):
self.dest_file = GLib.filename_to_uri(self.dest_file, None)
profile = get_profile(self.combination)
self.add_arguments("-f", profile, "-o", self.dest_file)
def check_results(self): def check_results(self):
if self.process.returncode == 0: if self.process.returncode == 0:
if self.combination:
try: try:
asset = GES.UriClipAsset.request_sync(self.dest_file) asset = GES.UriClipAsset.request_sync(self.dest_file)
if self.duration - DURATION_TOLERANCE <= asset.get_duration() \ if self.duration - DURATION_TOLERANCE <= asset.get_duration() \
@ -186,11 +146,8 @@ class GESTest(Test):
self.set_result(Result.PASSED) self.set_result(Result.PASSED)
except GLib.Error as e: except GLib.Error as e:
self.set_result(Result.FAILURE, "Wrong rendered file", "failure", e) self.set_result(Result.FAILURE, "Wrong rendered file", "failure", e)
else: else:
self.set_result(Result.PASSED) if self.result == Result.TIMEOUT:
else:
if self.combination and self.result == Result.TIMEOUT:
missing_eos = False missing_eos = False
try: try:
asset = GES.UriClipAsset.request_sync(self.dest_file) asset = GES.UriClipAsset.request_sync(self.dest_file)
@ -203,65 +160,22 @@ class GESTest(Test):
self.set_result(Result.TIMEOUT, "The rendered file add right duration, MISSING EOS?\n", self.set_result(Result.TIMEOUT, "The rendered file add right duration, MISSING EOS?\n",
"failure", e) "failure", e)
else: else:
Test.check_results(self) GstValidateTest.check_results(self)
def wait_process(self):
last_val = 0
last_change_ts = time.time()
while True:
self.process.poll()
if self.process.returncode is not None:
self.check_results()
break
# Dirty way to avoid eating to much CPU... good enough for us anyway.
time.sleep(1)
if self.combination:
val = os.stat(GLib.filename_from_uri(self.dest_file)[0]).st_size
else:
val = self.get_last_position()
if val == last_val:
if time.time() - last_change_ts > 10:
self.result = Result.TIMEOUT
else:
last_change_ts = time.time()
last_val = val
def get_last_position(self):
self.reporter.out.seek(0)
m = None
for l in self.reporter.out.readlines():
if "<Position:" in l:
m = l
if m is None:
return ""
pos = ""
for j in m.split("\r"):
if j.startswith("<Position:") and j.endswith("/>"):
pos = j
return pos
def get_current_value(self):
return get_current_size(self)
class GESTestsManager(TestsManager): class GESTestsManager(TestsManager):
name = "ges" name = "ges"
def __init__(self): def __init__(self):
super(GESTestsManager, self).__init__() super(GESTestsManager, self).__init__()
Gst.init(None) Gst.init(None)
GES.init() GES.init()
def add_options(self, group): def add_options(self, group):
group.add_option("-o", "--output-path", dest="dest",
default=None,
help="Set the path to which projects should be"
" renderd")
group.add_option("-P", "--projects-paths", dest="projects_paths", group.add_option("-P", "--projects-paths", dest="projects_paths",
default=os.path.join(DEFAULT_QA_SAMPLE_PATH, "ges-projects"), default=os.path.join(DEFAULT_GST_QA_ASSETS, "ges-projects"),
help="Paths in which to look for moved medias") help="Paths in which to look for moved medias")
group.add_option("-r", "--recurse-paths", dest="recurse_paths", group.add_option("-r", "--recurse-paths", dest="recurse_paths",
default=False, action="store_true", default=False, action="store_true",
@ -270,12 +184,6 @@ class GESTestsManager(TestsManager):
def set_settings(self, options, args, reporter): def set_settings(self, options, args, reporter):
TestsManager.set_settings(self, options, args, reporter) TestsManager.set_settings(self, options, args, reporter)
if options.dest is None:
options.dest = os.path.join(options.logsdir, "rendered")
if not Gst.uri_is_valid(options.dest):
options.dest = GLib.filename_to_uri(options.dest, None)
try: try:
os.makedirs(GLib.filename_from_uri(options.dest)[0]) os.makedirs(GLib.filename_from_uri(options.dest)[0])
print "Created directory: %s" % options.dest print "Created directory: %s" % options.dest
@ -307,21 +215,18 @@ class GESTestsManager(TestsManager):
# First playback casses # First playback casses
for scenario in SCENARIOS: for scenario in SCENARIOS:
classname = "ges.playback.%s.%s" % (scenario, os.path.basename(proj).replace(".xges", "")) classname = "ges.playback.%s.%s" % (scenario, os.path.basename(proj).replace(".xges", ""))
self.tests.append(GESTest(classname, self.tests.append(GESPlaybackTest(classname,
self.options, self.options,
self.reporter, self.reporter,
proj, proj,
scenario) scenario=scenario)
) )
# And now rendering casses # And now rendering casses
for comb in COMBINATIONS: for comb in COMBINATIONS:
classname = "ges.render.%s.%s" % (str(comb).replace(' ', '_'), classname = "ges.render.%s.%s" % (str(comb).replace(' ', '_'),
os.path.basename(proj).replace(".xges", "")) os.path.splitext(os.path.basename(proj))[0])
self.tests.append(GESTest(classname, self.tests.append(GESRenderTest(classname, self.options,
self.options, self.reporter, proj,
self.reporter, combination=comb)
proj,
None,
comb)
) )

View file

@ -17,58 +17,87 @@
# Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, # Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
# Boston, MA 02110-1301, USA. # Boston, MA 02110-1301, USA.
import os import os
import subprocess
import urlparse import urlparse
import urllib import subprocess
import ConfigParser import ConfigParser
from loggable import Loggable from loggable import Loggable
from testdefinitions import Test, TestsManager, DEFAULT_TIMEOUT from testdefinitions import GstValidateTest, TestsManager, DEFAULT_TIMEOUT
from utils import MediaFormatCombination, get_profile, path2url, Result, get_current_position, get_current_size
DEFAULT_GST_VALIDATE = "gst-validate-1.0" DEFAULT_GST_VALIDATE = "gst-validate-1.0"
DEFAULT_GST_VALIDATE_TRANSCODING = "gst-validate-transcoding-1.0"
DISCOVERER_COMMAND = ["gst-validate-media-check-1.0"] DISCOVERER_COMMAND = ["gst-validate-media-check-1.0"]
MEDIA_INFO_EXT = "media_info" MEDIA_INFO_EXT = "media_info"
STREAM_INFO = "stream_info" STREAM_INFO = "stream_info"
SEEKING_REQUIERED_SCENARIO=["seek_forward", "seek_backward", "scrub_forward_seeking"] SEEKING_REQUIERED_SCENARIO = ["seek_forward", "seek_backward", "scrub_forward_seeking"]
SPECIAL_PROTOCOLS = [("application/x-hls", "hls")] SPECIAL_PROTOCOLS = [("application/x-hls", "hls")]
all_tests = { PLAYBACK_TESTS = ["playbin uri=__uri__ audio_sink=autoaudiosink video_sink=autovideosink"]
"playback": SCENARIOS = ["none", "seek_forward", "seek_backward", "scrub_forward_seeking"]
{
"pipelines": ["playbin uri=__uri__ audio_sink=autoaudiosink video_sink=autovideosink"], COMBINATIONS = [
"scenarios": ["none", "seek_forward", "seek_backward", "scrub_forward_seeking"] MediaFormatCombination("ogg", "vorbis", "theora"),
}, MediaFormatCombination("webm", "vorbis", "vp8"),
} MediaFormatCombination("mp4", "mp3", "h264"),
MediaFormatCombination("mkv", "vorbis", "h264")]
def path2url(path): class GstValidateLaunchTest(GstValidateTest):
return urlparse.urljoin('file:', urllib.pathname2url(path)) def __init__(self, classname, options, reporter, pipeline_desc,
timeout=DEFAULT_TIMEOUT, scenario=None, file_infos=None):
class GstValidateTest(Test):
def __init__(self,
classname,
options,
reporter,
pipeline_desc,
scenario,
file_infos=None,
timeout=DEFAULT_TIMEOUT):
if file_infos is not None: if file_infos is not None:
timeout = file_infos.get("media-info", "file-duration") timeout = file_infos.get("media-info", "file-duration")
super(GstValidateTest, self).__init__(DEFAULT_GST_VALIDATE, classname, super(GstValidateLaunchTest, self).__init__(DEFAULT_GST_VALIDATE, classname,
options, reporter, scenario, options, reporter, timeout=timeout,
timeout) scenario=scenario,)
self.pipeline_desc = pipeline_desc self.pipeline_desc = pipeline_desc
self.file_infos = file_infos self.file_infos = file_infos
def build_arguments(self): def build_arguments(self):
Test.build_arguments(self) GstValidateTest.build_arguments(self)
self.add_arguments(self.pipeline_desc) self.add_arguments(self.pipeline_desc)
def get_current_value(self):
return get_current_position(self)
class GstValidateTranscodingTest(GstValidateTest):
def __init__(self, classname, options, reporter,
combination, uri, file_infos):
if file_infos is not None:
timeout = file_infos.get("media-info", "file-duration")
super(GstValidateTranscodingTest, self).__init__(
DEFAULT_GST_VALIDATE_TRANSCODING, classname,
options, reporter, timeout=timeout, scenario=None)
self.uri = uri
self.combination = combination
self.dest_file = ""
def set_rendering_info(self):
self.dest_file = os.path.join(self.options.dest,
os.path.basename(self.uri) +
'-' + self.combination.acodec +
self.combination.vcodec + '.' +
self.combination.container)
if urlparse.urlparse(self.dest_file).scheme == "":
self.dest_file = path2url(self.dest_file)
profile = get_profile(self.combination)
self.add_arguments("-o", profile)
def build_arguments(self):
self.set_rendering_info()
self.add_arguments(self.uri, self.dest_file)
def get_current_value(self):
return get_current_size(self)
class GstValidateManager(TestsManager, Loggable): class GstValidateManager(TestsManager, Loggable):
@ -86,11 +115,22 @@ class GstValidateManager(TestsManager, Loggable):
% DISCOVERER_COMMAND[0]) % DISCOVERER_COMMAND[0])
def list_tests(self): def list_tests(self):
for mainname, tests in all_tests.iteritems(): for test_pipeline in PLAYBACK_TESTS:
name = "validate.%s" % (mainname) name = "validate.playback"
for pipe in tests["pipelines"]: for scenario in SCENARIOS:
for scenario in tests["scenarios"]: self._add_playback_test(name, scenario, test_pipeline)
self._add_test(name, scenario, pipe)
for uri, config in self._list_uris():
for comb in COMBINATIONS:
classname = "validate.transcode"
classname = "validate.transcode.from_%s.to_%s" % (os.path.splitext(os.path.basename(uri))[0],
str(comb).replace(' ', '_'))
self.tests.append(GstValidateTranscodingTest(classname,
self.options,
self.reporter,
comb,
uri,
config))
def _check_discovering_info(self, media_info, uri=None): def _check_discovering_info(self, media_info, uri=None):
self.debug("Checking %s", media_info) self.debug("Checking %s", media_info)
@ -196,17 +236,17 @@ class GstValidateManager(TestsManager, Loggable):
os.path.basename(uri).replace(".", "_")) os.path.basename(uri).replace(".", "_"))
self.debug("Adding: %s", fname) self.debug("Adding: %s", fname)
self.tests.append(GstValidateTest(fname, self.tests.append(GstValidateLaunchTest(fname,
self.options, self.options,
self.reporter, self.reporter,
npipe.replace("__uri__", uri), npipe.replace("__uri__", uri),
scenario, scenario=scenario,
config) file_infos=config)
) )
else: else:
self.debug("Adding: %s", name) self.debug("Adding: %s", name)
self.tests.append(GstValidateTest(self._get_fname(fname, scenario), self.tests.append(GstValidateLaunchTest(self._get_fname(fname, scenario),
self.options, self.options,
self.reporter, self.reporter,
pipe, pipe,
scenario)) scenario=scenario))

View file

@ -19,9 +19,11 @@
import os import os
import loggable import loggable
from testdefinitions import _TestsLauncher, DEFAULT_QA_SAMPLE_PATH import urlparse
from utils import printc
from utils import printc, path2url
from optparse import OptionParser from optparse import OptionParser
from testdefinitions import _TestsLauncher, DEFAULT_GST_QA_ASSETS
def main(): def main():
@ -54,12 +56,16 @@ def main():
action="store_true", default=os.path.expanduser("~/gst-validate/logs/"), action="store_true", default=os.path.expanduser("~/gst-validate/logs/"),
help="Directory where to store logs") help="Directory where to store logs")
parser.add_option("-p", "--medias-paths", dest="paths", parser.add_option("-p", "--medias-paths", dest="paths",
default=[os.path.join(DEFAULT_QA_SAMPLE_PATH, "medias")], default=[os.path.join(DEFAULT_GST_QA_ASSETS, "medias")],
help="Paths in which to look for media files") help="Paths in which to look for media files")
parser.add_option("-m", "--mute", dest="mute", parser.add_option("-m", "--mute", dest="mute",
action="store_true", default=False, action="store_true", default=False,
help="Mute playback output, which mean that we use " help="Mute playback output, which mean that we use "
"a fakesink") "a fakesink")
parser.add_option("-o", "--output-path", dest="dest",
default=None,
help="Set the path to which projects should be"
" renderd")
loggable.init("GST_VALIDATE_LAUNCHER_DEBUG", True, False) loggable.init("GST_VALIDATE_LAUNCHER_DEBUG", True, False)
tests_launcher = _TestsLauncher() tests_launcher = _TestsLauncher()
@ -67,6 +73,10 @@ def main():
(options, args) = parser.parse_args() (options, args) = parser.parse_args()
if options.xunit_file is None: if options.xunit_file is None:
options.xunit_file = os.path.join(options.logsdir, "xunit.xml") options.xunit_file = os.path.join(options.logsdir, "xunit.xml")
if options.dest is None:
options.dest = os.path.join(options.logsdir, "rendered")
if urlparse.urlparse(options.dest).scheme == "":
options.dest = path2url(options.dest)
tests_launcher.set_settings(options, args) tests_launcher.set_settings(options, args)
tests_launcher.list_tests() tests_launcher.list_tests()
if options.list_tests: if options.list_tests:

View file

@ -31,14 +31,15 @@ from utils import mkdir, Result, Colors, printc
DEFAULT_TIMEOUT = 10 DEFAULT_TIMEOUT = 10
DEFAULT_QA_SAMPLE_PATH = os.path.join(os.path.expanduser('~'), "Videos", DEFAULT_GST_QA_ASSETS = os.path.join(os.path.expanduser('~'), "Videos",
"gst-qa-samples") "gst-qa-assets")
class Test(Loggable): class Test(Loggable):
""" A class representing a particular test. """ """ A class representing a particular test. """
def __init__(self, application_name, classname, options, reporter, scenario=None, timeout=DEFAULT_TIMEOUT): def __init__(self, application_name, classname, options,
reporter, timeout=DEFAULT_TIMEOUT):
Loggable.__init__(self) Loggable.__init__(self)
self.timeout = timeout self.timeout = timeout
self.classname = classname self.classname = classname
@ -47,10 +48,6 @@ class Test(Loggable):
self.command = "" self.command = ""
self.reporter = reporter self.reporter = reporter
self.process = None self.process = None
if scenario.lower() == "none":
self.scenario = None
else:
self.scenario = scenario
self.message = "" self.message = ""
self.error = "" self.error = ""
@ -73,8 +70,7 @@ class Test(Loggable):
self.command += " " + arg self.command += " " + arg
def build_arguments(self): def build_arguments(self):
if self.scenario is not None: pass
self.add_arguments("--set-scenario", self.scenario)
def set_result(self, result, message="", error=""): def set_result(self, result, message="", error=""):
self.result = result self.result = result
@ -82,7 +78,10 @@ class Test(Loggable):
self.error = error self.error = error
def check_results(self): def check_results(self):
self.debug("%s returncode: %d", self, self.process.returncode) if self.result is Result.FAILED:
return
self.debug("%s returncode: %s", self, self.process.returncode)
if self.result == Result.TIMEOUT: if self.result == Result.TIMEOUT:
self.set_result(Result.TIMEOUT, "Application timed out", "timeout") self.set_result(Result.TIMEOUT, "Application timed out", "timeout")
elif self.process.returncode == 0: elif self.process.returncode == 0:
@ -97,25 +96,100 @@ class Test(Loggable):
self.set_result(Result.FAILED, self.set_result(Result.FAILED,
"Application returned %d (issues: %s)" % ( "Application returned %d (issues: %s)" % (
self.process.returncode, self.process.returncode,
self.get_validate_criticals_errors()),
"error") "error")
)
def get_current_value(self):
"""
Lets subclasses implement a nicer timeout measurement method
They should return some value with which we will compare
the previous and timeout if they are egual during self.timeout
seconds
"""
return Result.NOT_RUN
def wait_process(self): def wait_process(self):
last_val = 0
last_change_ts = time.time() last_change_ts = time.time()
while True: while True:
self.process.poll() self.process.poll()
if self.process.returncode is not None: if self.process.returncode is not None:
break break
if time.time() - last_change_ts > self.timeout:
self.result = Result.TIMEOUT
# Dirty way to avoid eating to much CPU... # Dirty way to avoid eating to much CPU...
# good enough for us anyway. # good enough for us anyway.
time.sleep(1) time.sleep(1)
val = self.get_current_value()
if val is Result.NOT_RUN:
# The get_current_value logic is not implemented... dumb timeout
if time.time() - last_change_ts > self.timeout:
self.result = Result.TIMEOUT
break
continue
elif val is Result.FAILED:
self.result = Result.FAILED
break
self.log("New val %s" % val)
if val == last_val:
delta = time.time() - last_change_ts
self.debug("Same value for %d seconds" % delta)
if delta > self.timeout:
self.result = Result.TIMEOUT
break
else:
last_change_ts = time.time()
last_val = val
self.check_results() self.check_results()
def run(self):
self.command = "%s " % (self.application)
self._starting_time = time.time()
self.build_arguments()
printc("Launching:%s '%s' -- logs are in %s" % (Colors.ENDC, self.command,
self.reporter.out.name), Colors.OKBLUE)
try:
self.process = subprocess.Popen(self.command,
stderr=self.reporter.out,
stdout=self.reporter.out,
shell=True)
self.wait_process()
except KeyboardInterrupt:
self.process.kill()
raise
try:
self.process.terminate()
except OSError:
pass
self.time_taken = time.time() - self._starting_time
class GstValidateTest(Test):
""" A class representing a particular test. """
def __init__(self, application_name, classname,
options, reporter, timeout=DEFAULT_TIMEOUT,
scenario=None):
super(GstValidateTest, self).__init__(application_name, classname, options,
reporter, timeout=DEFAULT_TIMEOUT)
if scenario is None or scenario.lower() == "none":
self.scenario = None
else:
self.scenario = scenario
def build_arguments(self):
if self.scenario is not None:
self.add_arguments("--set-scenario", self.scenario)
def get_validate_criticals_errors(self): def get_validate_criticals_errors(self):
self.reporter.out.seek(0) self.reporter.out.seek(0)
ret = "[" ret = "["
@ -135,28 +209,6 @@ class Test(Loggable):
else: else:
return ret + "]" return ret + "]"
def run(self):
self.command = "%s " % (self.application)
self._starting_time = time.time()
self.build_arguments()
printc("Launching:%s '%s' -- logs are in %s" % (Colors.ENDC, self.classname,
self.reporter.out.name), Colors.OKBLUE)
try:
self.process = subprocess.Popen(self.command,
stderr=self.reporter.out,
stdout=self.reporter.out,
shell=True)
self.wait_process()
except KeyboardInterrupt:
self.process.kill()
raise
try:
self.process.terminate()
except OSError:
pass
self.time_taken = time.time() - self._starting_time
class TestsManager(object): class TestsManager(object):

View file

@ -19,6 +19,11 @@
""" Some utilies. """ """ Some utilies. """
import os import os
import urllib
import urlparse
GST_SECOND = 1000000000
class Result(object): class Result(object):
@ -70,3 +75,119 @@ def printc (message, color="", title=False):
def launch_command(command, color=None): def launch_command(command, color=None):
printc(command, Colors.OKGREEN, True) printc(command, Colors.OKGREEN, True)
os.system(command) os.system(command)
def path2url(path):
return urlparse.urljoin('file:', urllib.pathname2url(path))
##############################
# Encoding related utils #
##############################
class MediaFormatCombination(object):
def __str__(self):
return "%s and %s in %s" % (self.acodec, self.vcodec, self.container)
def __init__(self, container, acodec, vcodec):
self.container = container
self.acodec = acodec
self.vcodec = vcodec
FORMATS = {"aac": "audio/mpeg,mpegversion=4",
"ac3": "audio/x-ac3",
"vorbis": "audio/x-vorbis",
"mp3": "audio/mpeg,mpegversion=1,layer=3",
"h264": "video/x-h264",
"vp8": "video/x-vp8",
"theora": "video/x-theora",
"ogg": "application/ogg",
"mkv": "video/x-matroska",
"mp4": "video/quicktime,variant=iso;",
"webm": "video/x-matroska"}
def get_profile_full(muxer, venc, aenc, video_restriction=None,
audio_restriction=None,
audio_presence=0, video_presence=0):
ret = "\""
if muxer:
ret += muxer
ret += ":"
if venc:
if video_restriction is not None:
ret = ret + video_restriction + '->'
ret += venc
if video_presence:
ret = ret + '|' + str(video_presence)
if aenc:
ret += ":"
if audio_restriction is not None:
ret = ret + audio_restriction + '->'
ret += aenc
if audio_presence:
ret = ret + '|' + str(audio_presence)
ret += "\""
return ret.replace("::", ":")
def get_profile(combination):
return get_profile_full(FORMATS[combination.container],
FORMATS[combination.vcodec],
FORMATS[combination.acodec],
video_restriction="video/x-raw,format=I420")
##################################################
# Some utilities to parse gst-validate output #
##################################################
def _parse_position(p):
def parse_gsttimeargs(time):
return int(time.split(":")[0]) * 3600 + int(time.split(":")[1]) * 60 + int(time.split(":")[2].split(".")[0]) * 60
start_stop = p.replace("<Position: ", '').replace("/>", '').split(" / ")
return parse_gsttimeargs(start_stop[0]), parse_gsttimeargs(start_stop[1])
def _get_position(test):
position = duration = 0
test.reporter.out.seek(0)
m = None
for l in test.reporter.out.readlines():
if "<Position:" in l:
m = l
if m is None:
return position, duration
for j in m.split("\r"):
if j.startswith("<Position:") and j.endswith("/>"):
position, duration = _parse_position(j)
return position, duration
def get_current_position(test, max_passed_stop=0.5):
position, duration = _get_position(test)
if position > duration + max_passed_stop:
test.set_result(Result.FAILED,
"The position is reported as > than the"
" duration (position: %d > duration: %d)"
% (position, duration))
return Result.FAILED
return position
def get_current_size(test):
position = get_current_position(test)
if position is Result.FAILED:
return position
return os.stat(urlparse.urlparse(test.dest_file).path).st_size