validate:launch: Port to the new media_info format

This commit is contained in:
Thibault Saunier 2014-04-25 10:23:21 +02:00
parent d280d0dbc9
commit 81b0c74bfa
2 changed files with 41 additions and 35 deletions

View file

@ -21,6 +21,7 @@ import time
import urlparse import urlparse
import subprocess import subprocess
import ConfigParser import ConfigParser
import xml.etree.ElementTree as ET
from loggable import Loggable from loggable import Loggable
from baseclasses import GstValidateTest, TestsManager, Test, ScenarioManager, NamedDic from baseclasses import GstValidateTest, TestsManager, Test, ScenarioManager, NamedDic
@ -28,6 +29,11 @@ from utils import MediaFormatCombination, get_profile,\
path2url, DEFAULT_TIMEOUT, which, GST_SECOND, Result, \ path2url, DEFAULT_TIMEOUT, which, GST_SECOND, Result, \
compare_rendered_with_original, Protocols compare_rendered_with_original, Protocols
def is_image(media_xml):
for stream in media_xml.findall("streams")[0].findall("stream"):
if stream.attrib["type"] == "image":
return True
return False
class PipelineDescriptor(object): class PipelineDescriptor(object):
def __init__(self, name, pipeline): def __init__(self, name, pipeline):
@ -70,7 +76,6 @@ if "win32" in sys.platform:
GST_VALIDATE_COMMAND += ".exe" GST_VALIDATE_COMMAND += ".exe"
GST_VALIDATE_TRANSCODING_COMMAND += ".exe" GST_VALIDATE_TRANSCODING_COMMAND += ".exe"
G_V_DISCOVERER_COMMAND += ".exe" G_V_DISCOVERER_COMMAND += ".exe"
G_V_DISCOVERER_COMMAND += " --discover-only"
# Some extension file for discovering results # Some extension file for discovering results
G_V_MEDIA_INFO_EXT = "media_info" G_V_MEDIA_INFO_EXT = "media_info"
@ -133,9 +138,9 @@ G_V_BLACKLISTED_TESTS = \
class GstValidateLaunchTest(GstValidateTest): class GstValidateLaunchTest(GstValidateTest):
def __init__(self, classname, options, reporter, pipeline_desc, def __init__(self, classname, options, reporter, pipeline_desc,
timeout=DEFAULT_TIMEOUT, scenario=None, file_infos=None): timeout=DEFAULT_TIMEOUT, scenario=None, media_xml=None):
try: try:
timeout = G_V_PROTOCOL_TIMEOUTS[file_infos.get("file-info", "protocol")] timeout = G_V_PROTOCOL_TIMEOUTS[media_xml.attrib["protocol"]]
except KeyError: except KeyError:
pass pass
@ -145,7 +150,7 @@ class GstValidateLaunchTest(GstValidateTest):
timeout=timeout) timeout=timeout)
self.pipeline_desc = pipeline_desc self.pipeline_desc = pipeline_desc
self.file_infos = file_infos self.media_xml = media_xml
def build_arguments(self): def build_arguments(self):
GstValidateTest.build_arguments(self) GstValidateTest.build_arguments(self)
@ -157,7 +162,7 @@ class GstValidateLaunchTest(GstValidateTest):
if sent_eos is not None: if sent_eos is not None:
t = time.time() t = time.time()
if ((t - sent_eos)) > 30: if ((t - sent_eos)) > 30:
if self.file_infos.get("file-info", "protocol") == Protocols.HLS: if self.media_xml.attrib["protocol"] == Protocols.HLS:
self.set_result(Result.PASSED, self.set_result(Result.PASSED,
"""Got no EOS 30 seconds after sending EOS, """Got no EOS 30 seconds after sending EOS,
in HLS known and tolerated issue: in HLS known and tolerated issue:
@ -170,13 +175,13 @@ class GstValidateLaunchTest(GstValidateTest):
class GstValidateMediaCheckTest(Test): class GstValidateMediaCheckTest(Test):
def __init__(self, classname, options, reporter, file_infos, uri, minfo_path, def __init__(self, classname, options, reporter, media_xml, uri, minfo_path,
timeout=DEFAULT_TIMEOUT): timeout=DEFAULT_TIMEOUT):
super(GstValidateMediaCheckTest, self).__init__(G_V_DISCOVERER_COMMAND, classname, super(GstValidateMediaCheckTest, self).__init__(G_V_DISCOVERER_COMMAND, classname,
options, reporter, options, reporter,
timeout=timeout) timeout=timeout)
self._uri = uri self._uri = uri
self.file_infos = file_infos self.media_xml = media_xml
self._media_info_path = minfo_path self._media_info_path = minfo_path
def build_arguments(self): def build_arguments(self):
@ -187,12 +192,12 @@ class GstValidateMediaCheckTest(Test):
class GstValidateTranscodingTest(GstValidateTest): class GstValidateTranscodingTest(GstValidateTest):
_scenarios = ScenarioManager() _scenarios = ScenarioManager()
def __init__(self, classname, options, reporter, def __init__(self, classname, options, reporter,
combination, uri, file_infos, timeout=DEFAULT_TIMEOUT, combination, uri, media_xml, timeout=DEFAULT_TIMEOUT,
scenario_name="play_15s"): scenario_name="play_15s"):
Loggable.__init__(self) Loggable.__init__(self)
file_dur = long(file_infos.get("media-info", "file-duration")) / GST_SECOND file_dur = long(media_xml.attrib["duration"]) / GST_SECOND
if file_dur < 30: if file_dur < 30:
self.debug("%s is short (%ds< 30 secs) playing it all" % (uri, file_dur)) self.debug("%s is short (%ds< 30 secs) playing it all" % (uri, file_dur))
scenario = None scenario = None
@ -200,7 +205,7 @@ class GstValidateTranscodingTest(GstValidateTest):
self.debug("%s is long (%ds > 30 secs) playing it all" % (uri, file_dur)) self.debug("%s is long (%ds > 30 secs) playing it all" % (uri, file_dur))
scenario = self._scenarios.get_scenario(scenario_name) scenario = self._scenarios.get_scenario(scenario_name)
try: try:
timeout = G_V_PROTOCOL_TIMEOUTS[file_infos.get("file-info", "protocol")] timeout = G_V_PROTOCOL_TIMEOUTS[media_xml.attrib["protocol"]]
except KeyError: except KeyError:
pass pass
@ -214,7 +219,7 @@ class GstValidateTranscodingTest(GstValidateTest):
options, reporter, scenario=scenario, timeout=timeout, options, reporter, scenario=scenario, timeout=timeout,
hard_timeout=hard_timeout) hard_timeout=hard_timeout)
self.file_infos = file_infos self.media_xml = media_xml
self.uri = uri self.uri = uri
self.combination = combination self.combination = combination
self.dest_file = "" self.dest_file = ""
@ -228,7 +233,7 @@ class GstValidateTranscodingTest(GstValidateTest):
self.dest_file = path2url(self.dest_file) self.dest_file = path2url(self.dest_file)
try: try:
video_restriction = G_V_PROTOCOL_VIDEO_RESTRICTION_CAPS[self.file_infos.get("file-info", "protocol")] video_restriction = G_V_PROTOCOL_VIDEO_RESTRICTION_CAPS[self.media_xml.attrib["protocol"]]
except KeyError: except KeyError:
video_restriction = None video_restriction = None
@ -246,7 +251,7 @@ class GstValidateTranscodingTest(GstValidateTest):
if sent_eos is not None: if sent_eos is not None:
t = time.time() t = time.time()
if ((t - sent_eos)) > 30: if ((t - sent_eos)) > 30:
if self.file_infos.get("file-info", "protocol") == Protocols.HLS: if self.media_xml.attrib["protocol"] == Protocols.HLS:
self.set_result(Result.PASSED, self.set_result(Result.PASSED,
"""Got no EOS 30 seconds after sending EOS, """Got no EOS 30 seconds after sending EOS,
in HLS known and tolerated issue: in HLS known and tolerated issue:
@ -260,7 +265,7 @@ class GstValidateTranscodingTest(GstValidateTest):
def check_results(self): def check_results(self):
if self.result is Result.PASSED and not self.scenario: if self.result is Result.PASSED and not self.scenario:
orig_duration = long(self.file_infos.get("media-info", "file-duration")) orig_duration = long(self.media_xml.attrib["duration"])
res, msg = compare_rendered_with_original(orig_duration, self.dest_file) res, msg = compare_rendered_with_original(orig_duration, self.dest_file)
self.set_result(res, msg) self.set_result(res, msg)
elif self.message == "": elif self.message == "":
@ -292,7 +297,7 @@ class GstValidateManager(TestsManager, Loggable):
self._add_playback_test(test_pipeline) self._add_playback_test(test_pipeline)
for uri, mediainfo in self._list_uris(): for uri, mediainfo in self._list_uris():
protocol = mediainfo.config.get("file-info", "protocol") protocol = mediainfo.media_xml.attrib["protocol"]
try: try:
timeout = G_V_PROTOCOL_TIMEOUTS[protocol] timeout = G_V_PROTOCOL_TIMEOUTS[protocol]
except KeyError: except KeyError:
@ -303,48 +308,47 @@ class GstValidateManager(TestsManager, Loggable):
self.add_test(GstValidateMediaCheckTest(classname, self.add_test(GstValidateMediaCheckTest(classname,
self.options, self.options,
self.reporter, self.reporter,
mediainfo.config, mediainfo.media_xml,
uri, uri,
mediainfo.path, mediainfo.path,
timeout=timeout)) timeout=timeout))
for uri, mediainfo in self._list_uris(): for uri, mediainfo in self._list_uris():
if mediainfo.config.getboolean("media-info", "is-image") is True:
if is_image(mediainfo.media_xml):
continue continue
for comb in G_V_ENCODING_TARGET_COMBINATIONS: for comb in G_V_ENCODING_TARGET_COMBINATIONS:
classname = "validate.%s.transcode.to_%s.%s" % (mediainfo.config.get("file-info", "protocol"), classname = "validate.%s.transcode.to_%s.%s" % (mediainfo.media_xml.attrib["protocol"],
str(comb).replace(' ', '_'), str(comb).replace(' ', '_'),
os.path.basename(uri).replace(".", "_")) os.path.basename(uri).replace(".", "_"))
self.add_test(GstValidateTranscodingTest(classname, self.add_test(GstValidateTranscodingTest(classname,
self.options, self.options,
self.reporter, self.reporter,
comb, uri, comb, uri,
mediainfo.config)) mediainfo.media_xml))
return self.tests return self.tests
def _check_discovering_info(self, media_info, uri=None): def _check_discovering_info(self, media_info, uri=None):
self.debug("Checking %s", media_info) self.debug("Checking %s", media_info)
config = ConfigParser.ConfigParser() media_xml = ET.parse(media_info).getroot()
f = open(media_info)
config.readfp(f)
try: try:
# Just testing that the vairous mandatory infos are present # Just testing that the vairous mandatory infos are present
caps = config.get("media-info", "caps") caps = media_xml.findall("streams")[0].attrib["caps"]
config.get("media-info", "file-duration") media_xml.attrib["duration"]
config.get("media-info", "seekable") media_xml.attrib["seekable"]
if uri is None: if uri is None:
uri = config.get("file-info", "uri") uri = media_xml.attrib["uri"]
config.set("file-info", "protocol", urlparse.urlparse(uri).scheme) media_xml.attrib["protocol"] = urlparse.urlparse(uri).scheme
for caps2, prot in G_V_CAPS_TO_PROTOCOL: for caps2, prot in G_V_CAPS_TO_PROTOCOL:
if caps2 == caps: if caps2 == caps:
config.set("file-info", "protocol", prot) media_xml.attrib["protocol"] = prot
break break
self._uris.append((uri, self._uris.append((uri,
NamedDic({"path": media_info, NamedDic({"path": media_info,
"config": config}))) "media_xml": media_xml})))
except ConfigParser.NoOptionError as e: except ConfigParser.NoOptionError as e:
self.debug("Exception: %s for %s", e, media_info) self.debug("Exception: %s for %s", e, media_info)
f.close()
def _discover_file(self, uri, fpath): def _discover_file(self, uri, fpath):
try: try:
@ -401,14 +405,14 @@ class GstValidateManager(TestsManager, Loggable):
def _add_playback_test(self, pipe_descriptor): def _add_playback_test(self, pipe_descriptor):
if pipe_descriptor.needs_uri(): if pipe_descriptor.needs_uri():
for uri, minfo in self._list_uris(): for uri, minfo in self._list_uris():
protocol = minfo.config.get("file-info", "protocol") protocol = minfo.media_xml.attrib["protocol"]
for scenario_name in G_V_SCENARIOS[protocol]: for scenario_name in G_V_SCENARIOS[protocol]:
scenario = self._scenarios.get_scenario(scenario_name) scenario = self._scenarios.get_scenario(scenario_name)
npipe = pipe_descriptor.get_pipeline(self.options, npipe = pipe_descriptor.get_pipeline(self.options,
protocol, protocol,
scenario, scenario,
uri) uri)
if minfo.config.getboolean("media-info", "seekable") is False: if not minfo.media_xml.attrib["seekable"] or is_image(minfo.media_xml):
self.debug("Do not run %s as %s does not support seeking", self.debug("Do not run %s as %s does not support seeking",
scenario, uri) scenario, uri)
continue continue
@ -423,7 +427,7 @@ class GstValidateManager(TestsManager, Loggable):
self.reporter, self.reporter,
npipe, npipe,
scenario=scenario, scenario=scenario,
file_infos=minfo.config) media_xml=minfo.media_xml)
) )
else: else:
self.add_test(GstValidateLaunchTest(self._get_fname(scenario, "testing"), self.add_test(GstValidateLaunchTest(self._get_fname(scenario, "testing"),
@ -435,8 +439,8 @@ class GstValidateManager(TestsManager, Loggable):
def needs_http_server(self): def needs_http_server(self):
for test in self.list_tests(): for test in self.list_tests():
if self._is_test_wanted(test): if self._is_test_wanted(test):
protocol = test.file_infos.get("file-info", "protocol") protocol = test.media_xml.attrib["protocol"]
uri = test.file_infos.get("file-info", "uri") uri = test.media_xml.attrib["uri"]
if protocol == Protocols.HTTP and \ if protocol == Protocols.HTTP and \
"127.0.0.1:%s" % (self.options.http_server_port) in uri: "127.0.0.1:%s" % (self.options.http_server_port) in uri:

View file

@ -191,6 +191,8 @@ def main():
l.sort() l.sort()
for test in l: for test in l:
printc(test) printc(test)
printc("\nNumber of tests: %d" % len (l), Colors.OKGREEN)
return 0 return 0
httpsrv = HTTPServer(options) httpsrv = HTTPServer(options)