diff --git a/validate/tools/apps/ges-projects-tests.py b/validate/tools/apps/ges-projects-tests.py
index 6a16bfff25..fc5c5c6e86 100644
--- a/validate/tools/apps/ges-projects-tests.py
+++ b/validate/tools/apps/ges-projects-tests.py
@@ -18,88 +18,32 @@
# Boston, MA 02110-1301, USA.
import os
-import time
+import urlparse
from urllib import unquote
-from urlparse import urlsplit
from gi.repository import GES, Gst, GLib
-from testdefinitions import Test, DEFAULT_QA_SAMPLE_PATH, TestsManager
+from testdefinitions import GstValidateTest, DEFAULT_GST_QA_ASSETS, TestsManager
+from utils import MediaFormatCombination, get_profile, Result, get_current_position, get_current_size
DURATION_TOLERANCE = Gst.SECOND / 2
DEFAULT_GES_LAUNCH = "ges-launch-1.0"
-class Combination(object):
-
- def __str__(self):
- return "%s and %s in %s" % (self.acodec, self.vcodec, self.container)
-
- def __init__(self, container, acodec, vcodec):
- self.container = container
- self.acodec = acodec
- self.vcodec = vcodec
-
-
-FORMATS = {"aac": "audio/mpeg,mpegversion=4",
- "ac3": "audio/x-ac3",
- "vorbis": "audio/x-vorbis",
- "mp3": "audio/mpeg,mpegversion=1,layer=3",
- "h264": "video/x-h264",
- "vp8": "video/x-vp8",
- "theora": "video/x-theora",
- "ogg": "application/ogg",
- "mkv": "video/x-matroska",
- "mp4": "video/quicktime,variant=iso;",
- "webm": "video/x-matroska"}
-
COMBINATIONS = [
- Combination("ogg", "vorbis", "theora"),
- Combination("webm", "vorbis", "vp8"),
- Combination("mp4", "mp3", "h264"),
- Combination("mkv", "vorbis", "h264")]
+ MediaFormatCombination("ogg", "vorbis", "theora"),
+ MediaFormatCombination("webm", "vorbis", "vp8"),
+ MediaFormatCombination("mp4", "mp3", "h264"),
+ MediaFormatCombination("mkv", "vorbis", "h264")]
SCENARIOS = ["none", "seek_forward", "seek_backward", "scrub_forward_seeking"]
-def get_profile_full(muxer, venc, aenc, video_restriction=None,
- audio_restriction=None,
- audio_presence=0, video_presence=0):
- ret = "\""
- if muxer:
- ret += muxer
- ret += ":"
- if venc:
- if video_restriction is not None:
- ret = ret + video_restriction + '->'
- ret += venc
- if video_presence:
- ret = ret + '|' + str(video_presence)
- if aenc:
- ret += ":"
- if audio_restriction is not None:
- ret = ret + audio_restriction + '->'
- ret += aenc
- if audio_presence:
- ret = ret + '|' + str(audio_presence)
-
- ret += "\""
- return ret.replace("::", ":")
-
-
-
-def get_profile(combination):
- return get_profile_full(FORMATS[combination.container],
- FORMATS[combination.vcodec],
- FORMATS[combination.acodec],
- video_restriction="video/x-raw,format=I420")
-
-
def quote_uri(uri):
"""
Encode a URI/path according to RFC 2396, without touching the file:/// part.
"""
# Split off the "file:///" part, if present.
- parts = urlsplit(uri, allow_fragments=False)
+ parts = urlparse.urlsplit(uri, allow_fragments=False)
# Make absolutely sure the string is unquoted before quoting again!
raw_path = unquote(parts.path)
# For computing thumbnail md5 hashes in the media library, we must adhere to
@@ -107,13 +51,12 @@ def quote_uri(uri):
return Gst.filename_to_uri(raw_path)
-class GESTest(Test):
- def __init__(self, classname, options, reporter, project_uri, scenario,
+class GESTest(GstValidateTest):
+ def __init__(self, classname, options, reporter, project_uri, scenario=None,
combination=None):
super(GESTest, self).__init__(DEFAULT_GES_LAUNCH, classname, options, reporter,
- scenario)
+ scenario=scenario)
self.project_uri = project_uri
- self.combination = combination
proj = GES.Project.new(project_uri)
tl = proj.extract()
if tl is None:
@@ -125,18 +68,6 @@ class GESTest(Test):
else:
self.duration = 2 * 60
- def set_rendering_info(self):
- self.dest_file = os.path.join(self.options.dest,
- os.path.basename(self.project_uri) +
- '-' + self.combination.acodec +
- self.combination.vcodec + '.' +
- self.combination.container)
- if not Gst.uri_is_valid(self.dest_file):
- self.dest_file = GLib.filename_to_uri(self.dest_file, None)
-
- profile = get_profile(self.combination)
- self.add_arguments("-f", profile, "-o", self.dest_file)
-
def set_sample_paths(self):
if not self.options.paths:
if not self.options.recurse_paths:
@@ -155,14 +86,12 @@ class GESTest(Test):
root,
directory)
)
- )
+ )
else:
self.add_arguments("--sample-paths", "file://" + path)
def build_arguments(self):
- Test.build_arguments(self)
- if self.combination is not None:
- self.set_rendering_info()
+ GstValidateTest.build_arguments(self)
if self.options.mute:
self.add_arguments(" --mute")
@@ -170,27 +99,55 @@ class GESTest(Test):
self.set_sample_paths()
self.add_arguments("-l", self.project_uri)
+
+class GESPlaybackTest(GESTest):
+ def __init__(self, classname, options, reporter, project_uri, scenario):
+ super(GESPlaybackTest, self).__init__(classname, options, reporter,
+ project_uri, scenario=scenario)
+
+ def get_current_value(self):
+ return get_current_position(self)
+
+
+class GESRenderTest(GESTest):
+ def __init__(self, classname, options, reporter, project_uri, combination):
+ super(GESRenderTest, self).__init__(classname, options, reporter,
+ project_uri)
+ self.combination = combination
+
+ def build_arguments(self):
+ GESTest.build_arguments(self)
+ self._set_rendering_info()
+
+ def _set_rendering_info(self):
+ self.dest_file = os.path.join(self.options.dest,
+ os.path.basename(self.project_uri) +
+ '-' + self.combination.acodec +
+ self.combination.vcodec + '.' +
+ self.combination.container)
+ if not Gst.uri_is_valid(self.dest_file):
+ self.dest_file = GLib.filename_to_uri(self.dest_file, None)
+
+ profile = get_profile(self.combination)
+ self.add_arguments("-f", profile, "-o", self.dest_file)
+
def check_results(self):
if self.process.returncode == 0:
- if self.combination:
- try:
- asset = GES.UriClipAsset.request_sync(self.dest_file)
- if self.duration - DURATION_TOLERANCE <= asset.get_duration() \
- <= self.duration + DURATION_TOLERANCE:
- self.set_result(Result.FAILURE, "Duration of encoded file is "
- " wrong (%s instead of %s)" %
- (Gst.TIME_ARGS(self.duration),
- Gst.TIME_ARGS(asset.get_duration())),
- "wrong-duration")
- else:
- self.set_result(Result.PASSED)
- except GLib.Error as e:
- self.set_result(Result.FAILURE, "Wrong rendered file", "failure", e)
-
- else:
- self.set_result(Result.PASSED)
+ try:
+ asset = GES.UriClipAsset.request_sync(self.dest_file)
+ if self.duration - DURATION_TOLERANCE <= asset.get_duration() \
+ <= self.duration + DURATION_TOLERANCE:
+ self.set_result(Result.FAILURE, "Duration of encoded file is "
+ " wrong (%s instead of %s)" %
+ (Gst.TIME_ARGS(self.duration),
+ Gst.TIME_ARGS(asset.get_duration())),
+ "wrong-duration")
+ else:
+ self.set_result(Result.PASSED)
+ except GLib.Error as e:
+ self.set_result(Result.FAILURE, "Wrong rendered file", "failure", e)
else:
- if self.combination and self.result == Result.TIMEOUT:
+ if self.result == Result.TIMEOUT:
missing_eos = False
try:
asset = GES.UriClipAsset.request_sync(self.dest_file)
@@ -203,65 +160,22 @@ class GESTest(Test):
self.set_result(Result.TIMEOUT, "The rendered file add right duration, MISSING EOS?\n",
"failure", e)
else:
- Test.check_results(self)
-
- def wait_process(self):
- last_val = 0
- last_change_ts = time.time()
- while True:
- self.process.poll()
- if self.process.returncode is not None:
- self.check_results()
- break
-
- # Dirty way to avoid eating to much CPU... good enough for us anyway.
- time.sleep(1)
-
- if self.combination:
- val = os.stat(GLib.filename_from_uri(self.dest_file)[0]).st_size
- else:
- val = self.get_last_position()
-
- if val == last_val:
- if time.time() - last_change_ts > 10:
- self.result = Result.TIMEOUT
- else:
- last_change_ts = time.time()
- last_val = val
-
-
- def get_last_position(self):
- self.reporter.out.seek(0)
- m = None
- for l in self.reporter.out.readlines():
- if ""):
- pos = j
-
- return pos
+ GstValidateTest.check_results(self)
+ def get_current_value(self):
+ return get_current_size(self)
class GESTestsManager(TestsManager):
name = "ges"
+
def __init__(self):
super(GESTestsManager, self).__init__()
Gst.init(None)
GES.init()
def add_options(self, group):
- group.add_option("-o", "--output-path", dest="dest",
- default=None,
- help="Set the path to which projects should be"
- " renderd")
group.add_option("-P", "--projects-paths", dest="projects_paths",
- default=os.path.join(DEFAULT_QA_SAMPLE_PATH, "ges-projects"),
+ default=os.path.join(DEFAULT_GST_QA_ASSETS, "ges-projects"),
help="Paths in which to look for moved medias")
group.add_option("-r", "--recurse-paths", dest="recurse_paths",
default=False, action="store_true",
@@ -270,12 +184,6 @@ class GESTestsManager(TestsManager):
def set_settings(self, options, args, reporter):
TestsManager.set_settings(self, options, args, reporter)
- if options.dest is None:
- options.dest = os.path.join(options.logsdir, "rendered")
-
- if not Gst.uri_is_valid(options.dest):
- options.dest = GLib.filename_to_uri(options.dest, None)
-
try:
os.makedirs(GLib.filename_from_uri(options.dest)[0])
print "Created directory: %s" % options.dest
@@ -307,21 +215,18 @@ class GESTestsManager(TestsManager):
# First playback casses
for scenario in SCENARIOS:
classname = "ges.playback.%s.%s" % (scenario, os.path.basename(proj).replace(".xges", ""))
- self.tests.append(GESTest(classname,
- self.options,
- self.reporter,
- proj,
- scenario)
+ self.tests.append(GESPlaybackTest(classname,
+ self.options,
+ self.reporter,
+ proj,
+ scenario=scenario)
)
# And now rendering casses
for comb in COMBINATIONS:
classname = "ges.render.%s.%s" % (str(comb).replace(' ', '_'),
- os.path.basename(proj).replace(".xges", ""))
- self.tests.append(GESTest(classname,
- self.options,
- self.reporter,
- proj,
- None,
- comb)
+ os.path.splitext(os.path.basename(proj))[0])
+ self.tests.append(GESRenderTest(classname, self.options,
+ self.reporter, proj,
+ combination=comb)
)
diff --git a/validate/tools/apps/gst-validate.py b/validate/tools/apps/gst-validate.py
index 95e4ffaeac..0b83a603d1 100644
--- a/validate/tools/apps/gst-validate.py
+++ b/validate/tools/apps/gst-validate.py
@@ -17,58 +17,87 @@
# Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
# Boston, MA 02110-1301, USA.
import os
-import subprocess
import urlparse
-import urllib
+import subprocess
import ConfigParser
from loggable import Loggable
-from testdefinitions import Test, TestsManager, DEFAULT_TIMEOUT
+from testdefinitions import GstValidateTest, TestsManager, DEFAULT_TIMEOUT
+from utils import MediaFormatCombination, get_profile, path2url, Result, get_current_position, get_current_size
DEFAULT_GST_VALIDATE = "gst-validate-1.0"
+DEFAULT_GST_VALIDATE_TRANSCODING = "gst-validate-transcoding-1.0"
DISCOVERER_COMMAND = ["gst-validate-media-check-1.0"]
MEDIA_INFO_EXT = "media_info"
STREAM_INFO = "stream_info"
-SEEKING_REQUIERED_SCENARIO=["seek_forward", "seek_backward", "scrub_forward_seeking"]
+SEEKING_REQUIERED_SCENARIO = ["seek_forward", "seek_backward", "scrub_forward_seeking"]
SPECIAL_PROTOCOLS = [("application/x-hls", "hls")]
-all_tests = {
- "playback":
- {
- "pipelines": ["playbin uri=__uri__ audio_sink=autoaudiosink video_sink=autovideosink"],
- "scenarios": ["none", "seek_forward", "seek_backward", "scrub_forward_seeking"]
- },
-}
+PLAYBACK_TESTS = ["playbin uri=__uri__ audio_sink=autoaudiosink video_sink=autovideosink"]
+SCENARIOS = ["none", "seek_forward", "seek_backward", "scrub_forward_seeking"]
+
+COMBINATIONS = [
+ MediaFormatCombination("ogg", "vorbis", "theora"),
+ MediaFormatCombination("webm", "vorbis", "vp8"),
+ MediaFormatCombination("mp4", "mp3", "h264"),
+ MediaFormatCombination("mkv", "vorbis", "h264")]
-def path2url(path):
- return urlparse.urljoin('file:', urllib.pathname2url(path))
-
-
-class GstValidateTest(Test):
- def __init__(self,
- classname,
- options,
- reporter,
- pipeline_desc,
- scenario,
- file_infos=None,
- timeout=DEFAULT_TIMEOUT):
+class GstValidateLaunchTest(GstValidateTest):
+ def __init__(self, classname, options, reporter, pipeline_desc,
+ timeout=DEFAULT_TIMEOUT, scenario=None, file_infos=None):
if file_infos is not None:
timeout = file_infos.get("media-info", "file-duration")
- super(GstValidateTest, self).__init__(DEFAULT_GST_VALIDATE, classname,
- options, reporter, scenario,
- timeout)
+ super(GstValidateLaunchTest, self).__init__(DEFAULT_GST_VALIDATE, classname,
+ options, reporter, timeout=timeout,
+ scenario=scenario,)
self.pipeline_desc = pipeline_desc
self.file_infos = file_infos
def build_arguments(self):
- Test.build_arguments(self)
+ GstValidateTest.build_arguments(self)
self.add_arguments(self.pipeline_desc)
+ def get_current_value(self):
+ return get_current_position(self)
+
+
+class GstValidateTranscodingTest(GstValidateTest):
+ def __init__(self, classname, options, reporter,
+ combination, uri, file_infos):
+
+ if file_infos is not None:
+ timeout = file_infos.get("media-info", "file-duration")
+
+ super(GstValidateTranscodingTest, self).__init__(
+ DEFAULT_GST_VALIDATE_TRANSCODING, classname,
+ options, reporter, timeout=timeout, scenario=None)
+ self.uri = uri
+ self.combination = combination
+ self.dest_file = ""
+
+ def set_rendering_info(self):
+ self.dest_file = os.path.join(self.options.dest,
+ os.path.basename(self.uri) +
+ '-' + self.combination.acodec +
+ self.combination.vcodec + '.' +
+ self.combination.container)
+ if urlparse.urlparse(self.dest_file).scheme == "":
+ self.dest_file = path2url(self.dest_file)
+
+ profile = get_profile(self.combination)
+ self.add_arguments("-o", profile)
+
+ def build_arguments(self):
+ self.set_rendering_info()
+ self.add_arguments(self.uri, self.dest_file)
+
+ def get_current_value(self):
+ return get_current_size(self)
+
class GstValidateManager(TestsManager, Loggable):
@@ -86,11 +115,22 @@ class GstValidateManager(TestsManager, Loggable):
% DISCOVERER_COMMAND[0])
def list_tests(self):
- for mainname, tests in all_tests.iteritems():
- name = "validate.%s" % (mainname)
- for pipe in tests["pipelines"]:
- for scenario in tests["scenarios"]:
- self._add_test(name, scenario, pipe)
+ for test_pipeline in PLAYBACK_TESTS:
+ name = "validate.playback"
+ for scenario in SCENARIOS:
+ self._add_playback_test(name, scenario, test_pipeline)
+
+ for uri, config in self._list_uris():
+ for comb in COMBINATIONS:
+ classname = "validate.transcode"
+ classname = "validate.transcode.from_%s.to_%s" % (os.path.splitext(os.path.basename(uri))[0],
+ str(comb).replace(' ', '_'))
+ self.tests.append(GstValidateTranscodingTest(classname,
+ self.options,
+ self.reporter,
+ comb,
+ uri,
+ config))
def _check_discovering_info(self, media_info, uri=None):
self.debug("Checking %s", media_info)
@@ -196,17 +236,17 @@ class GstValidateManager(TestsManager, Loggable):
os.path.basename(uri).replace(".", "_"))
self.debug("Adding: %s", fname)
- self.tests.append(GstValidateTest(fname,
+ self.tests.append(GstValidateLaunchTest(fname,
self.options,
self.reporter,
npipe.replace("__uri__", uri),
- scenario,
- config)
+ scenario=scenario,
+ file_infos=config)
)
else:
self.debug("Adding: %s", name)
- self.tests.append(GstValidateTest(self._get_fname(fname, scenario),
+ self.tests.append(GstValidateLaunchTest(self._get_fname(fname, scenario),
self.options,
self.reporter,
pipe,
- scenario))
+ scenario=scenario))
diff --git a/validate/tools/gst-validate-launcher.py b/validate/tools/gst-validate-launcher.py
index cf574214c7..26f26fb308 100644
--- a/validate/tools/gst-validate-launcher.py
+++ b/validate/tools/gst-validate-launcher.py
@@ -19,9 +19,11 @@
import os
import loggable
-from testdefinitions import _TestsLauncher, DEFAULT_QA_SAMPLE_PATH
-from utils import printc
+import urlparse
+
+from utils import printc, path2url
from optparse import OptionParser
+from testdefinitions import _TestsLauncher, DEFAULT_GST_QA_ASSETS
def main():
@@ -54,12 +56,16 @@ def main():
action="store_true", default=os.path.expanduser("~/gst-validate/logs/"),
help="Directory where to store logs")
parser.add_option("-p", "--medias-paths", dest="paths",
- default=[os.path.join(DEFAULT_QA_SAMPLE_PATH, "medias")],
+ default=[os.path.join(DEFAULT_GST_QA_ASSETS, "medias")],
help="Paths in which to look for media files")
parser.add_option("-m", "--mute", dest="mute",
action="store_true", default=False,
help="Mute playback output, which mean that we use "
"a fakesink")
+ parser.add_option("-o", "--output-path", dest="dest",
+ default=None,
+ help="Set the path to which projects should be"
+ " renderd")
loggable.init("GST_VALIDATE_LAUNCHER_DEBUG", True, False)
tests_launcher = _TestsLauncher()
@@ -67,6 +73,10 @@ def main():
(options, args) = parser.parse_args()
if options.xunit_file is None:
options.xunit_file = os.path.join(options.logsdir, "xunit.xml")
+ if options.dest is None:
+ options.dest = os.path.join(options.logsdir, "rendered")
+ if urlparse.urlparse(options.dest).scheme == "":
+ options.dest = path2url(options.dest)
tests_launcher.set_settings(options, args)
tests_launcher.list_tests()
if options.list_tests:
diff --git a/validate/tools/testdefinitions.py b/validate/tools/testdefinitions.py
index 72c1a58ae5..3c99e902c4 100644
--- a/validate/tools/testdefinitions.py
+++ b/validate/tools/testdefinitions.py
@@ -31,14 +31,15 @@ from utils import mkdir, Result, Colors, printc
DEFAULT_TIMEOUT = 10
-DEFAULT_QA_SAMPLE_PATH = os.path.join(os.path.expanduser('~'), "Videos",
- "gst-qa-samples")
+DEFAULT_GST_QA_ASSETS = os.path.join(os.path.expanduser('~'), "Videos",
+ "gst-qa-assets")
class Test(Loggable):
""" A class representing a particular test. """
- def __init__(self, application_name, classname, options, reporter, scenario=None, timeout=DEFAULT_TIMEOUT):
+ def __init__(self, application_name, classname, options,
+ reporter, timeout=DEFAULT_TIMEOUT):
Loggable.__init__(self)
self.timeout = timeout
self.classname = classname
@@ -47,10 +48,6 @@ class Test(Loggable):
self.command = ""
self.reporter = reporter
self.process = None
- if scenario.lower() == "none":
- self.scenario = None
- else:
- self.scenario = scenario
self.message = ""
self.error = ""
@@ -73,8 +70,7 @@ class Test(Loggable):
self.command += " " + arg
def build_arguments(self):
- if self.scenario is not None:
- self.add_arguments("--set-scenario", self.scenario)
+ pass
def set_result(self, result, message="", error=""):
self.result = result
@@ -82,7 +78,10 @@ class Test(Loggable):
self.error = error
def check_results(self):
- self.debug("%s returncode: %d", self, self.process.returncode)
+ if self.result is Result.FAILED:
+ return
+
+ self.debug("%s returncode: %s", self, self.process.returncode)
if self.result == Result.TIMEOUT:
self.set_result(Result.TIMEOUT, "Application timed out", "timeout")
elif self.process.returncode == 0:
@@ -97,25 +96,100 @@ class Test(Loggable):
self.set_result(Result.FAILED,
"Application returned %d (issues: %s)" % (
self.process.returncode,
- self.get_validate_criticals_errors()),
"error")
+ )
+
+ def get_current_value(self):
+ """
+ Lets subclasses implement a nicer timeout measurement method
+ They should return some value with which we will compare
+ the previous and timeout if they are egual during self.timeout
+ seconds
+ """
+ return Result.NOT_RUN
def wait_process(self):
+ last_val = 0
last_change_ts = time.time()
while True:
self.process.poll()
if self.process.returncode is not None:
break
- if time.time() - last_change_ts > self.timeout:
- self.result = Result.TIMEOUT
-
# Dirty way to avoid eating to much CPU...
# good enough for us anyway.
time.sleep(1)
+ val = self.get_current_value()
+
+ if val is Result.NOT_RUN:
+ # The get_current_value logic is not implemented... dumb timeout
+ if time.time() - last_change_ts > self.timeout:
+ self.result = Result.TIMEOUT
+ break
+ continue
+ elif val is Result.FAILED:
+ self.result = Result.FAILED
+ break
+
+ self.log("New val %s" % val)
+
+ if val == last_val:
+ delta = time.time() - last_change_ts
+ self.debug("Same value for %d seconds" % delta)
+ if delta > self.timeout:
+ self.result = Result.TIMEOUT
+ break
+ else:
+ last_change_ts = time.time()
+ last_val = val
+
self.check_results()
+ def run(self):
+ self.command = "%s " % (self.application)
+ self._starting_time = time.time()
+ self.build_arguments()
+ printc("Launching:%s '%s' -- logs are in %s" % (Colors.ENDC, self.command,
+ self.reporter.out.name), Colors.OKBLUE)
+ try:
+ self.process = subprocess.Popen(self.command,
+ stderr=self.reporter.out,
+ stdout=self.reporter.out,
+ shell=True)
+ self.wait_process()
+ except KeyboardInterrupt:
+ self.process.kill()
+ raise
+
+ try:
+ self.process.terminate()
+ except OSError:
+ pass
+
+ self.time_taken = time.time() - self._starting_time
+
+
+class GstValidateTest(Test):
+
+ """ A class representing a particular test. """
+
+ def __init__(self, application_name, classname,
+ options, reporter, timeout=DEFAULT_TIMEOUT,
+ scenario=None):
+
+ super(GstValidateTest, self).__init__(application_name, classname, options,
+ reporter, timeout=DEFAULT_TIMEOUT)
+
+ if scenario is None or scenario.lower() == "none":
+ self.scenario = None
+ else:
+ self.scenario = scenario
+
+ def build_arguments(self):
+ if self.scenario is not None:
+ self.add_arguments("--set-scenario", self.scenario)
+
def get_validate_criticals_errors(self):
self.reporter.out.seek(0)
ret = "["
@@ -135,28 +209,6 @@ class Test(Loggable):
else:
return ret + "]"
- def run(self):
- self.command = "%s " % (self.application)
- self._starting_time = time.time()
- self.build_arguments()
- printc("Launching:%s '%s' -- logs are in %s" % (Colors.ENDC, self.classname,
- self.reporter.out.name), Colors.OKBLUE)
- try:
- self.process = subprocess.Popen(self.command,
- stderr=self.reporter.out,
- stdout=self.reporter.out,
- shell=True)
- self.wait_process()
- except KeyboardInterrupt:
- self.process.kill()
- raise
-
- try:
- self.process.terminate()
- except OSError:
- pass
-
- self.time_taken = time.time() - self._starting_time
class TestsManager(object):
diff --git a/validate/tools/utils.py b/validate/tools/utils.py
index d32237e3a5..8ff69e1721 100644
--- a/validate/tools/utils.py
+++ b/validate/tools/utils.py
@@ -19,6 +19,11 @@
""" Some utilies. """
import os
+import urllib
+import urlparse
+
+
+GST_SECOND = 1000000000
class Result(object):
@@ -70,3 +75,119 @@ def printc (message, color="", title=False):
def launch_command(command, color=None):
printc(command, Colors.OKGREEN, True)
os.system(command)
+
+def path2url(path):
+ return urlparse.urljoin('file:', urllib.pathname2url(path))
+
+
+##############################
+# Encoding related utils #
+##############################
+class MediaFormatCombination(object):
+
+ def __str__(self):
+ return "%s and %s in %s" % (self.acodec, self.vcodec, self.container)
+
+ def __init__(self, container, acodec, vcodec):
+ self.container = container
+ self.acodec = acodec
+ self.vcodec = vcodec
+
+
+FORMATS = {"aac": "audio/mpeg,mpegversion=4",
+ "ac3": "audio/x-ac3",
+ "vorbis": "audio/x-vorbis",
+ "mp3": "audio/mpeg,mpegversion=1,layer=3",
+ "h264": "video/x-h264",
+ "vp8": "video/x-vp8",
+ "theora": "video/x-theora",
+ "ogg": "application/ogg",
+ "mkv": "video/x-matroska",
+ "mp4": "video/quicktime,variant=iso;",
+ "webm": "video/x-matroska"}
+
+def get_profile_full(muxer, venc, aenc, video_restriction=None,
+ audio_restriction=None,
+ audio_presence=0, video_presence=0):
+ ret = "\""
+ if muxer:
+ ret += muxer
+ ret += ":"
+ if venc:
+ if video_restriction is not None:
+ ret = ret + video_restriction + '->'
+ ret += venc
+ if video_presence:
+ ret = ret + '|' + str(video_presence)
+ if aenc:
+ ret += ":"
+ if audio_restriction is not None:
+ ret = ret + audio_restriction + '->'
+ ret += aenc
+ if audio_presence:
+ ret = ret + '|' + str(audio_presence)
+
+ ret += "\""
+ return ret.replace("::", ":")
+
+
+
+def get_profile(combination):
+ return get_profile_full(FORMATS[combination.container],
+ FORMATS[combination.vcodec],
+ FORMATS[combination.acodec],
+ video_restriction="video/x-raw,format=I420")
+
+##################################################
+# Some utilities to parse gst-validate output #
+##################################################
+
+
+def _parse_position(p):
+ def parse_gsttimeargs(time):
+ return int(time.split(":")[0]) * 3600 + int(time.split(":")[1]) * 60 + int(time.split(":")[2].split(".")[0]) * 60
+
+ start_stop = p.replace("", '').split(" / ")
+
+ return parse_gsttimeargs(start_stop[0]), parse_gsttimeargs(start_stop[1])
+
+
+def _get_position(test):
+ position = duration = 0
+
+ test.reporter.out.seek(0)
+ m = None
+ for l in test.reporter.out.readlines():
+ if ""):
+ position, duration = _parse_position(j)
+
+ return position, duration
+
+
+def get_current_position(test, max_passed_stop=0.5):
+ position, duration = _get_position(test)
+
+ if position > duration + max_passed_stop:
+ test.set_result(Result.FAILED,
+ "The position is reported as > than the"
+ " duration (position: %d > duration: %d)"
+ % (position, duration))
+ return Result.FAILED
+
+ return position
+
+
+def get_current_size(test):
+ position = get_current_position(test)
+
+ if position is Result.FAILED:
+ return position
+
+ return os.stat(urlparse.urlparse(test.dest_file).path).st_size