From dac5b38a4d5b985a8c301da6a9b681a1f0d56904 Mon Sep 17 00:00:00 2001 From: Thibault Saunier Date: Thu, 9 Jan 2014 09:14:27 +0100 Subject: [PATCH] validate: tools: Refactor and add a GstValidateTranscodeTest class --- validate/tools/apps/ges-projects-tests.py | 241 +++++++--------------- validate/tools/apps/gst-validate.py | 116 +++++++---- validate/tools/gst-validate-launcher.py | 16 +- validate/tools/testdefinitions.py | 124 +++++++---- validate/tools/utils.py | 121 +++++++++++ 5 files changed, 373 insertions(+), 245 deletions(-) diff --git a/validate/tools/apps/ges-projects-tests.py b/validate/tools/apps/ges-projects-tests.py index 6a16bfff25..fc5c5c6e86 100644 --- a/validate/tools/apps/ges-projects-tests.py +++ b/validate/tools/apps/ges-projects-tests.py @@ -18,88 +18,32 @@ # Boston, MA 02110-1301, USA. import os -import time +import urlparse from urllib import unquote -from urlparse import urlsplit from gi.repository import GES, Gst, GLib -from testdefinitions import Test, DEFAULT_QA_SAMPLE_PATH, TestsManager +from testdefinitions import GstValidateTest, DEFAULT_GST_QA_ASSETS, TestsManager +from utils import MediaFormatCombination, get_profile, Result, get_current_position, get_current_size DURATION_TOLERANCE = Gst.SECOND / 2 DEFAULT_GES_LAUNCH = "ges-launch-1.0" -class Combination(object): - - def __str__(self): - return "%s and %s in %s" % (self.acodec, self.vcodec, self.container) - - def __init__(self, container, acodec, vcodec): - self.container = container - self.acodec = acodec - self.vcodec = vcodec - - -FORMATS = {"aac": "audio/mpeg,mpegversion=4", - "ac3": "audio/x-ac3", - "vorbis": "audio/x-vorbis", - "mp3": "audio/mpeg,mpegversion=1,layer=3", - "h264": "video/x-h264", - "vp8": "video/x-vp8", - "theora": "video/x-theora", - "ogg": "application/ogg", - "mkv": "video/x-matroska", - "mp4": "video/quicktime,variant=iso;", - "webm": "video/x-matroska"} - COMBINATIONS = [ - Combination("ogg", "vorbis", "theora"), - Combination("webm", "vorbis", "vp8"), - Combination("mp4", "mp3", "h264"), - Combination("mkv", "vorbis", "h264")] + MediaFormatCombination("ogg", "vorbis", "theora"), + MediaFormatCombination("webm", "vorbis", "vp8"), + MediaFormatCombination("mp4", "mp3", "h264"), + MediaFormatCombination("mkv", "vorbis", "h264")] SCENARIOS = ["none", "seek_forward", "seek_backward", "scrub_forward_seeking"] -def get_profile_full(muxer, venc, aenc, video_restriction=None, - audio_restriction=None, - audio_presence=0, video_presence=0): - ret = "\"" - if muxer: - ret += muxer - ret += ":" - if venc: - if video_restriction is not None: - ret = ret + video_restriction + '->' - ret += venc - if video_presence: - ret = ret + '|' + str(video_presence) - if aenc: - ret += ":" - if audio_restriction is not None: - ret = ret + audio_restriction + '->' - ret += aenc - if audio_presence: - ret = ret + '|' + str(audio_presence) - - ret += "\"" - return ret.replace("::", ":") - - - -def get_profile(combination): - return get_profile_full(FORMATS[combination.container], - FORMATS[combination.vcodec], - FORMATS[combination.acodec], - video_restriction="video/x-raw,format=I420") - - def quote_uri(uri): """ Encode a URI/path according to RFC 2396, without touching the file:/// part. """ # Split off the "file:///" part, if present. - parts = urlsplit(uri, allow_fragments=False) + parts = urlparse.urlsplit(uri, allow_fragments=False) # Make absolutely sure the string is unquoted before quoting again! raw_path = unquote(parts.path) # For computing thumbnail md5 hashes in the media library, we must adhere to @@ -107,13 +51,12 @@ def quote_uri(uri): return Gst.filename_to_uri(raw_path) -class GESTest(Test): - def __init__(self, classname, options, reporter, project_uri, scenario, +class GESTest(GstValidateTest): + def __init__(self, classname, options, reporter, project_uri, scenario=None, combination=None): super(GESTest, self).__init__(DEFAULT_GES_LAUNCH, classname, options, reporter, - scenario) + scenario=scenario) self.project_uri = project_uri - self.combination = combination proj = GES.Project.new(project_uri) tl = proj.extract() if tl is None: @@ -125,18 +68,6 @@ class GESTest(Test): else: self.duration = 2 * 60 - def set_rendering_info(self): - self.dest_file = os.path.join(self.options.dest, - os.path.basename(self.project_uri) + - '-' + self.combination.acodec + - self.combination.vcodec + '.' + - self.combination.container) - if not Gst.uri_is_valid(self.dest_file): - self.dest_file = GLib.filename_to_uri(self.dest_file, None) - - profile = get_profile(self.combination) - self.add_arguments("-f", profile, "-o", self.dest_file) - def set_sample_paths(self): if not self.options.paths: if not self.options.recurse_paths: @@ -155,14 +86,12 @@ class GESTest(Test): root, directory) ) - ) + ) else: self.add_arguments("--sample-paths", "file://" + path) def build_arguments(self): - Test.build_arguments(self) - if self.combination is not None: - self.set_rendering_info() + GstValidateTest.build_arguments(self) if self.options.mute: self.add_arguments(" --mute") @@ -170,27 +99,55 @@ class GESTest(Test): self.set_sample_paths() self.add_arguments("-l", self.project_uri) + +class GESPlaybackTest(GESTest): + def __init__(self, classname, options, reporter, project_uri, scenario): + super(GESPlaybackTest, self).__init__(classname, options, reporter, + project_uri, scenario=scenario) + + def get_current_value(self): + return get_current_position(self) + + +class GESRenderTest(GESTest): + def __init__(self, classname, options, reporter, project_uri, combination): + super(GESRenderTest, self).__init__(classname, options, reporter, + project_uri) + self.combination = combination + + def build_arguments(self): + GESTest.build_arguments(self) + self._set_rendering_info() + + def _set_rendering_info(self): + self.dest_file = os.path.join(self.options.dest, + os.path.basename(self.project_uri) + + '-' + self.combination.acodec + + self.combination.vcodec + '.' + + self.combination.container) + if not Gst.uri_is_valid(self.dest_file): + self.dest_file = GLib.filename_to_uri(self.dest_file, None) + + profile = get_profile(self.combination) + self.add_arguments("-f", profile, "-o", self.dest_file) + def check_results(self): if self.process.returncode == 0: - if self.combination: - try: - asset = GES.UriClipAsset.request_sync(self.dest_file) - if self.duration - DURATION_TOLERANCE <= asset.get_duration() \ - <= self.duration + DURATION_TOLERANCE: - self.set_result(Result.FAILURE, "Duration of encoded file is " - " wrong (%s instead of %s)" % - (Gst.TIME_ARGS(self.duration), - Gst.TIME_ARGS(asset.get_duration())), - "wrong-duration") - else: - self.set_result(Result.PASSED) - except GLib.Error as e: - self.set_result(Result.FAILURE, "Wrong rendered file", "failure", e) - - else: - self.set_result(Result.PASSED) + try: + asset = GES.UriClipAsset.request_sync(self.dest_file) + if self.duration - DURATION_TOLERANCE <= asset.get_duration() \ + <= self.duration + DURATION_TOLERANCE: + self.set_result(Result.FAILURE, "Duration of encoded file is " + " wrong (%s instead of %s)" % + (Gst.TIME_ARGS(self.duration), + Gst.TIME_ARGS(asset.get_duration())), + "wrong-duration") + else: + self.set_result(Result.PASSED) + except GLib.Error as e: + self.set_result(Result.FAILURE, "Wrong rendered file", "failure", e) else: - if self.combination and self.result == Result.TIMEOUT: + if self.result == Result.TIMEOUT: missing_eos = False try: asset = GES.UriClipAsset.request_sync(self.dest_file) @@ -203,65 +160,22 @@ class GESTest(Test): self.set_result(Result.TIMEOUT, "The rendered file add right duration, MISSING EOS?\n", "failure", e) else: - Test.check_results(self) - - def wait_process(self): - last_val = 0 - last_change_ts = time.time() - while True: - self.process.poll() - if self.process.returncode is not None: - self.check_results() - break - - # Dirty way to avoid eating to much CPU... good enough for us anyway. - time.sleep(1) - - if self.combination: - val = os.stat(GLib.filename_from_uri(self.dest_file)[0]).st_size - else: - val = self.get_last_position() - - if val == last_val: - if time.time() - last_change_ts > 10: - self.result = Result.TIMEOUT - else: - last_change_ts = time.time() - last_val = val - - - def get_last_position(self): - self.reporter.out.seek(0) - m = None - for l in self.reporter.out.readlines(): - if ""): - pos = j - - return pos + GstValidateTest.check_results(self) + def get_current_value(self): + return get_current_size(self) class GESTestsManager(TestsManager): name = "ges" + def __init__(self): super(GESTestsManager, self).__init__() Gst.init(None) GES.init() def add_options(self, group): - group.add_option("-o", "--output-path", dest="dest", - default=None, - help="Set the path to which projects should be" - " renderd") group.add_option("-P", "--projects-paths", dest="projects_paths", - default=os.path.join(DEFAULT_QA_SAMPLE_PATH, "ges-projects"), + default=os.path.join(DEFAULT_GST_QA_ASSETS, "ges-projects"), help="Paths in which to look for moved medias") group.add_option("-r", "--recurse-paths", dest="recurse_paths", default=False, action="store_true", @@ -270,12 +184,6 @@ class GESTestsManager(TestsManager): def set_settings(self, options, args, reporter): TestsManager.set_settings(self, options, args, reporter) - if options.dest is None: - options.dest = os.path.join(options.logsdir, "rendered") - - if not Gst.uri_is_valid(options.dest): - options.dest = GLib.filename_to_uri(options.dest, None) - try: os.makedirs(GLib.filename_from_uri(options.dest)[0]) print "Created directory: %s" % options.dest @@ -307,21 +215,18 @@ class GESTestsManager(TestsManager): # First playback casses for scenario in SCENARIOS: classname = "ges.playback.%s.%s" % (scenario, os.path.basename(proj).replace(".xges", "")) - self.tests.append(GESTest(classname, - self.options, - self.reporter, - proj, - scenario) + self.tests.append(GESPlaybackTest(classname, + self.options, + self.reporter, + proj, + scenario=scenario) ) # And now rendering casses for comb in COMBINATIONS: classname = "ges.render.%s.%s" % (str(comb).replace(' ', '_'), - os.path.basename(proj).replace(".xges", "")) - self.tests.append(GESTest(classname, - self.options, - self.reporter, - proj, - None, - comb) + os.path.splitext(os.path.basename(proj))[0]) + self.tests.append(GESRenderTest(classname, self.options, + self.reporter, proj, + combination=comb) ) diff --git a/validate/tools/apps/gst-validate.py b/validate/tools/apps/gst-validate.py index 95e4ffaeac..0b83a603d1 100644 --- a/validate/tools/apps/gst-validate.py +++ b/validate/tools/apps/gst-validate.py @@ -17,58 +17,87 @@ # Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, # Boston, MA 02110-1301, USA. import os -import subprocess import urlparse -import urllib +import subprocess import ConfigParser from loggable import Loggable -from testdefinitions import Test, TestsManager, DEFAULT_TIMEOUT +from testdefinitions import GstValidateTest, TestsManager, DEFAULT_TIMEOUT +from utils import MediaFormatCombination, get_profile, path2url, Result, get_current_position, get_current_size DEFAULT_GST_VALIDATE = "gst-validate-1.0" +DEFAULT_GST_VALIDATE_TRANSCODING = "gst-validate-transcoding-1.0" DISCOVERER_COMMAND = ["gst-validate-media-check-1.0"] MEDIA_INFO_EXT = "media_info" STREAM_INFO = "stream_info" -SEEKING_REQUIERED_SCENARIO=["seek_forward", "seek_backward", "scrub_forward_seeking"] +SEEKING_REQUIERED_SCENARIO = ["seek_forward", "seek_backward", "scrub_forward_seeking"] SPECIAL_PROTOCOLS = [("application/x-hls", "hls")] -all_tests = { - "playback": - { - "pipelines": ["playbin uri=__uri__ audio_sink=autoaudiosink video_sink=autovideosink"], - "scenarios": ["none", "seek_forward", "seek_backward", "scrub_forward_seeking"] - }, -} +PLAYBACK_TESTS = ["playbin uri=__uri__ audio_sink=autoaudiosink video_sink=autovideosink"] +SCENARIOS = ["none", "seek_forward", "seek_backward", "scrub_forward_seeking"] + +COMBINATIONS = [ + MediaFormatCombination("ogg", "vorbis", "theora"), + MediaFormatCombination("webm", "vorbis", "vp8"), + MediaFormatCombination("mp4", "mp3", "h264"), + MediaFormatCombination("mkv", "vorbis", "h264")] -def path2url(path): - return urlparse.urljoin('file:', urllib.pathname2url(path)) - - -class GstValidateTest(Test): - def __init__(self, - classname, - options, - reporter, - pipeline_desc, - scenario, - file_infos=None, - timeout=DEFAULT_TIMEOUT): +class GstValidateLaunchTest(GstValidateTest): + def __init__(self, classname, options, reporter, pipeline_desc, + timeout=DEFAULT_TIMEOUT, scenario=None, file_infos=None): if file_infos is not None: timeout = file_infos.get("media-info", "file-duration") - super(GstValidateTest, self).__init__(DEFAULT_GST_VALIDATE, classname, - options, reporter, scenario, - timeout) + super(GstValidateLaunchTest, self).__init__(DEFAULT_GST_VALIDATE, classname, + options, reporter, timeout=timeout, + scenario=scenario,) self.pipeline_desc = pipeline_desc self.file_infos = file_infos def build_arguments(self): - Test.build_arguments(self) + GstValidateTest.build_arguments(self) self.add_arguments(self.pipeline_desc) + def get_current_value(self): + return get_current_position(self) + + +class GstValidateTranscodingTest(GstValidateTest): + def __init__(self, classname, options, reporter, + combination, uri, file_infos): + + if file_infos is not None: + timeout = file_infos.get("media-info", "file-duration") + + super(GstValidateTranscodingTest, self).__init__( + DEFAULT_GST_VALIDATE_TRANSCODING, classname, + options, reporter, timeout=timeout, scenario=None) + self.uri = uri + self.combination = combination + self.dest_file = "" + + def set_rendering_info(self): + self.dest_file = os.path.join(self.options.dest, + os.path.basename(self.uri) + + '-' + self.combination.acodec + + self.combination.vcodec + '.' + + self.combination.container) + if urlparse.urlparse(self.dest_file).scheme == "": + self.dest_file = path2url(self.dest_file) + + profile = get_profile(self.combination) + self.add_arguments("-o", profile) + + def build_arguments(self): + self.set_rendering_info() + self.add_arguments(self.uri, self.dest_file) + + def get_current_value(self): + return get_current_size(self) + class GstValidateManager(TestsManager, Loggable): @@ -86,11 +115,22 @@ class GstValidateManager(TestsManager, Loggable): % DISCOVERER_COMMAND[0]) def list_tests(self): - for mainname, tests in all_tests.iteritems(): - name = "validate.%s" % (mainname) - for pipe in tests["pipelines"]: - for scenario in tests["scenarios"]: - self._add_test(name, scenario, pipe) + for test_pipeline in PLAYBACK_TESTS: + name = "validate.playback" + for scenario in SCENARIOS: + self._add_playback_test(name, scenario, test_pipeline) + + for uri, config in self._list_uris(): + for comb in COMBINATIONS: + classname = "validate.transcode" + classname = "validate.transcode.from_%s.to_%s" % (os.path.splitext(os.path.basename(uri))[0], + str(comb).replace(' ', '_')) + self.tests.append(GstValidateTranscodingTest(classname, + self.options, + self.reporter, + comb, + uri, + config)) def _check_discovering_info(self, media_info, uri=None): self.debug("Checking %s", media_info) @@ -196,17 +236,17 @@ class GstValidateManager(TestsManager, Loggable): os.path.basename(uri).replace(".", "_")) self.debug("Adding: %s", fname) - self.tests.append(GstValidateTest(fname, + self.tests.append(GstValidateLaunchTest(fname, self.options, self.reporter, npipe.replace("__uri__", uri), - scenario, - config) + scenario=scenario, + file_infos=config) ) else: self.debug("Adding: %s", name) - self.tests.append(GstValidateTest(self._get_fname(fname, scenario), + self.tests.append(GstValidateLaunchTest(self._get_fname(fname, scenario), self.options, self.reporter, pipe, - scenario)) + scenario=scenario)) diff --git a/validate/tools/gst-validate-launcher.py b/validate/tools/gst-validate-launcher.py index cf574214c7..26f26fb308 100644 --- a/validate/tools/gst-validate-launcher.py +++ b/validate/tools/gst-validate-launcher.py @@ -19,9 +19,11 @@ import os import loggable -from testdefinitions import _TestsLauncher, DEFAULT_QA_SAMPLE_PATH -from utils import printc +import urlparse + +from utils import printc, path2url from optparse import OptionParser +from testdefinitions import _TestsLauncher, DEFAULT_GST_QA_ASSETS def main(): @@ -54,12 +56,16 @@ def main(): action="store_true", default=os.path.expanduser("~/gst-validate/logs/"), help="Directory where to store logs") parser.add_option("-p", "--medias-paths", dest="paths", - default=[os.path.join(DEFAULT_QA_SAMPLE_PATH, "medias")], + default=[os.path.join(DEFAULT_GST_QA_ASSETS, "medias")], help="Paths in which to look for media files") parser.add_option("-m", "--mute", dest="mute", action="store_true", default=False, help="Mute playback output, which mean that we use " "a fakesink") + parser.add_option("-o", "--output-path", dest="dest", + default=None, + help="Set the path to which projects should be" + " renderd") loggable.init("GST_VALIDATE_LAUNCHER_DEBUG", True, False) tests_launcher = _TestsLauncher() @@ -67,6 +73,10 @@ def main(): (options, args) = parser.parse_args() if options.xunit_file is None: options.xunit_file = os.path.join(options.logsdir, "xunit.xml") + if options.dest is None: + options.dest = os.path.join(options.logsdir, "rendered") + if urlparse.urlparse(options.dest).scheme == "": + options.dest = path2url(options.dest) tests_launcher.set_settings(options, args) tests_launcher.list_tests() if options.list_tests: diff --git a/validate/tools/testdefinitions.py b/validate/tools/testdefinitions.py index 72c1a58ae5..3c99e902c4 100644 --- a/validate/tools/testdefinitions.py +++ b/validate/tools/testdefinitions.py @@ -31,14 +31,15 @@ from utils import mkdir, Result, Colors, printc DEFAULT_TIMEOUT = 10 -DEFAULT_QA_SAMPLE_PATH = os.path.join(os.path.expanduser('~'), "Videos", - "gst-qa-samples") +DEFAULT_GST_QA_ASSETS = os.path.join(os.path.expanduser('~'), "Videos", + "gst-qa-assets") class Test(Loggable): """ A class representing a particular test. """ - def __init__(self, application_name, classname, options, reporter, scenario=None, timeout=DEFAULT_TIMEOUT): + def __init__(self, application_name, classname, options, + reporter, timeout=DEFAULT_TIMEOUT): Loggable.__init__(self) self.timeout = timeout self.classname = classname @@ -47,10 +48,6 @@ class Test(Loggable): self.command = "" self.reporter = reporter self.process = None - if scenario.lower() == "none": - self.scenario = None - else: - self.scenario = scenario self.message = "" self.error = "" @@ -73,8 +70,7 @@ class Test(Loggable): self.command += " " + arg def build_arguments(self): - if self.scenario is not None: - self.add_arguments("--set-scenario", self.scenario) + pass def set_result(self, result, message="", error=""): self.result = result @@ -82,7 +78,10 @@ class Test(Loggable): self.error = error def check_results(self): - self.debug("%s returncode: %d", self, self.process.returncode) + if self.result is Result.FAILED: + return + + self.debug("%s returncode: %s", self, self.process.returncode) if self.result == Result.TIMEOUT: self.set_result(Result.TIMEOUT, "Application timed out", "timeout") elif self.process.returncode == 0: @@ -97,25 +96,100 @@ class Test(Loggable): self.set_result(Result.FAILED, "Application returned %d (issues: %s)" % ( self.process.returncode, - self.get_validate_criticals_errors()), "error") + ) + + def get_current_value(self): + """ + Lets subclasses implement a nicer timeout measurement method + They should return some value with which we will compare + the previous and timeout if they are egual during self.timeout + seconds + """ + return Result.NOT_RUN def wait_process(self): + last_val = 0 last_change_ts = time.time() while True: self.process.poll() if self.process.returncode is not None: break - if time.time() - last_change_ts > self.timeout: - self.result = Result.TIMEOUT - # Dirty way to avoid eating to much CPU... # good enough for us anyway. time.sleep(1) + val = self.get_current_value() + + if val is Result.NOT_RUN: + # The get_current_value logic is not implemented... dumb timeout + if time.time() - last_change_ts > self.timeout: + self.result = Result.TIMEOUT + break + continue + elif val is Result.FAILED: + self.result = Result.FAILED + break + + self.log("New val %s" % val) + + if val == last_val: + delta = time.time() - last_change_ts + self.debug("Same value for %d seconds" % delta) + if delta > self.timeout: + self.result = Result.TIMEOUT + break + else: + last_change_ts = time.time() + last_val = val + self.check_results() + def run(self): + self.command = "%s " % (self.application) + self._starting_time = time.time() + self.build_arguments() + printc("Launching:%s '%s' -- logs are in %s" % (Colors.ENDC, self.command, + self.reporter.out.name), Colors.OKBLUE) + try: + self.process = subprocess.Popen(self.command, + stderr=self.reporter.out, + stdout=self.reporter.out, + shell=True) + self.wait_process() + except KeyboardInterrupt: + self.process.kill() + raise + + try: + self.process.terminate() + except OSError: + pass + + self.time_taken = time.time() - self._starting_time + + +class GstValidateTest(Test): + + """ A class representing a particular test. """ + + def __init__(self, application_name, classname, + options, reporter, timeout=DEFAULT_TIMEOUT, + scenario=None): + + super(GstValidateTest, self).__init__(application_name, classname, options, + reporter, timeout=DEFAULT_TIMEOUT) + + if scenario is None or scenario.lower() == "none": + self.scenario = None + else: + self.scenario = scenario + + def build_arguments(self): + if self.scenario is not None: + self.add_arguments("--set-scenario", self.scenario) + def get_validate_criticals_errors(self): self.reporter.out.seek(0) ret = "[" @@ -135,28 +209,6 @@ class Test(Loggable): else: return ret + "]" - def run(self): - self.command = "%s " % (self.application) - self._starting_time = time.time() - self.build_arguments() - printc("Launching:%s '%s' -- logs are in %s" % (Colors.ENDC, self.classname, - self.reporter.out.name), Colors.OKBLUE) - try: - self.process = subprocess.Popen(self.command, - stderr=self.reporter.out, - stdout=self.reporter.out, - shell=True) - self.wait_process() - except KeyboardInterrupt: - self.process.kill() - raise - - try: - self.process.terminate() - except OSError: - pass - - self.time_taken = time.time() - self._starting_time class TestsManager(object): diff --git a/validate/tools/utils.py b/validate/tools/utils.py index d32237e3a5..8ff69e1721 100644 --- a/validate/tools/utils.py +++ b/validate/tools/utils.py @@ -19,6 +19,11 @@ """ Some utilies. """ import os +import urllib +import urlparse + + +GST_SECOND = 1000000000 class Result(object): @@ -70,3 +75,119 @@ def printc (message, color="", title=False): def launch_command(command, color=None): printc(command, Colors.OKGREEN, True) os.system(command) + +def path2url(path): + return urlparse.urljoin('file:', urllib.pathname2url(path)) + + +############################## +# Encoding related utils # +############################## +class MediaFormatCombination(object): + + def __str__(self): + return "%s and %s in %s" % (self.acodec, self.vcodec, self.container) + + def __init__(self, container, acodec, vcodec): + self.container = container + self.acodec = acodec + self.vcodec = vcodec + + +FORMATS = {"aac": "audio/mpeg,mpegversion=4", + "ac3": "audio/x-ac3", + "vorbis": "audio/x-vorbis", + "mp3": "audio/mpeg,mpegversion=1,layer=3", + "h264": "video/x-h264", + "vp8": "video/x-vp8", + "theora": "video/x-theora", + "ogg": "application/ogg", + "mkv": "video/x-matroska", + "mp4": "video/quicktime,variant=iso;", + "webm": "video/x-matroska"} + +def get_profile_full(muxer, venc, aenc, video_restriction=None, + audio_restriction=None, + audio_presence=0, video_presence=0): + ret = "\"" + if muxer: + ret += muxer + ret += ":" + if venc: + if video_restriction is not None: + ret = ret + video_restriction + '->' + ret += venc + if video_presence: + ret = ret + '|' + str(video_presence) + if aenc: + ret += ":" + if audio_restriction is not None: + ret = ret + audio_restriction + '->' + ret += aenc + if audio_presence: + ret = ret + '|' + str(audio_presence) + + ret += "\"" + return ret.replace("::", ":") + + + +def get_profile(combination): + return get_profile_full(FORMATS[combination.container], + FORMATS[combination.vcodec], + FORMATS[combination.acodec], + video_restriction="video/x-raw,format=I420") + +################################################## +# Some utilities to parse gst-validate output # +################################################## + + +def _parse_position(p): + def parse_gsttimeargs(time): + return int(time.split(":")[0]) * 3600 + int(time.split(":")[1]) * 60 + int(time.split(":")[2].split(".")[0]) * 60 + + start_stop = p.replace("", '').split(" / ") + + return parse_gsttimeargs(start_stop[0]), parse_gsttimeargs(start_stop[1]) + + +def _get_position(test): + position = duration = 0 + + test.reporter.out.seek(0) + m = None + for l in test.reporter.out.readlines(): + if ""): + position, duration = _parse_position(j) + + return position, duration + + +def get_current_position(test, max_passed_stop=0.5): + position, duration = _get_position(test) + + if position > duration + max_passed_stop: + test.set_result(Result.FAILED, + "The position is reported as > than the" + " duration (position: %d > duration: %d)" + % (position, duration)) + return Result.FAILED + + return position + + +def get_current_size(test): + position = get_current_position(test) + + if position is Result.FAILED: + return position + + return os.stat(urlparse.urlparse(test.dest_file).path).st_size