validate:launcher: Move the MediaDescriptor class to the baseclasses.py file

This commit is contained in:
Thibault Saunier 2014-07-16 10:10:44 +02:00
parent 58256ab135
commit 87cc7da213
2 changed files with 81 additions and 75 deletions

View file

@ -21,11 +21,12 @@ import time
import urlparse import urlparse
import subprocess import subprocess
import ConfigParser import ConfigParser
import xml.etree.ElementTree as ET
from loggable import Loggable from loggable import Loggable
from baseclasses import GstValidateTest, TestsManager, Test, \ from baseclasses import GstValidateTest, TestsManager, Test, \
ScenarioManager, NamedDic, GstValidateTestsGenerator ScenarioManager, NamedDic, GstValidateTestsGenerator, \
GstValidateMediaDescriptor
from utils import MediaFormatCombination, get_profile,\ from utils import MediaFormatCombination, get_profile,\
path2url, DEFAULT_TIMEOUT, which, GST_SECOND, Result, \ path2url, DEFAULT_TIMEOUT, which, GST_SECOND, Result, \
compare_rendered_with_original, Protocols compare_rendered_with_original, Protocols
@ -43,10 +44,6 @@ if "win32" in sys.platform:
GST_VALIDATE_TRANSCODING_COMMAND += ".exe" GST_VALIDATE_TRANSCODING_COMMAND += ".exe"
G_V_DISCOVERER_COMMAND += ".exe" G_V_DISCOVERER_COMMAND += ".exe"
# Some extension file for discovering results
G_V_MEDIA_INFO_EXT = "media_info"
G_V_STREAM_INFO_EXT = "stream_info"
AUDIO_ONLY_FILE_TRANSCODING_RATIO = 5 AUDIO_ONLY_FILE_TRANSCODING_RATIO = 5
################################################# #################################################
@ -60,71 +57,6 @@ GST_VALIDATE_CAPS_TO_PROTOCOL = [("application/x-hls", Protocols.HLS)]
GST_VALIDATE_PROTOCOL_TIMEOUTS = {Protocols.HTTP: 120, GST_VALIDATE_PROTOCOL_TIMEOUTS = {Protocols.HTTP: 120,
Protocols.HLS: 240} Protocols.HLS: 240}
class GstValidateMediaDescriptor(Loggable):
def __init__(self, xml_path):
Loggable.__init__(self)
self._xml_path = xml_path
self.media_xml = ET.parse(xml_path).getroot()
# Sanity checks
self.media_xml.attrib["duration"]
self.media_xml.attrib["seekable"]
def get_media_filepath(self):
if self.get_protocol() == Protocols.FILE:
return self._xml_path.replace("." + G_V_MEDIA_INFO_EXT, "")
else:
return self._xml_path.replace("." + G_V_STREAM_INFO_EXT, "")
def get_caps(self):
return self.media_xml.findall("streams")[0].attrib["caps"]
def get_uri(self):
return self.media_xml.attrib["uri"]
def get_duration(self):
return long(self.media_xml.attrib["duration"])
def set_protocol(self, protocol):
self.media_xml.attrib["protocol"] = protocol
def get_protocol(self):
return self.media_xml.attrib["protocol"]
def is_seekable(self):
return self.media_xml.attrib["seekable"]
def is_image(self):
for stream in self.media_xml.findall("streams")[0].findall("stream"):
if stream.attrib["type"] == "image":
return True
return False
def get_num_tracks(self, track_type):
n = 0
for stream in self.media_xml.findall("streams")[0].findall("stream"):
if stream.attrib["type"] == track_type:
n += 1
return n
def is_compatible(self, scenario):
if scenario.seeks() and (not self.is_seekable() or self.is_image()):
self.debug("Do not run %s as %s does not support seeking",
scenario, self.get_uri())
return False
for track_type in ['audio', 'subtitle']:
if self.get_num_tracks(track_type) < scenario.get_min_tracks(track_type):
self.debug("%s -- %s | At least %s %s track needed < %s"
% (scenario, self.get_uri(), track_type,
scenario.get_min_tracks(track_type),
self.get_num_tracks(track_type)))
return False
return True
class GstValidateMediaCheckTestsGenerator(GstValidateTestsGenerator): class GstValidateMediaCheckTestsGenerator(GstValidateTestsGenerator):
def __init__(self, test_manager): def __init__(self, test_manager):
@ -550,13 +482,13 @@ not been tested and explicitely activated if you set use --wanted-tests ALL""")
def _discover_file(self, uri, fpath): def _discover_file(self, uri, fpath):
try: try:
media_info = "%s.%s" % (fpath, G_V_MEDIA_INFO_EXT) media_info = "%s.%s" % (fpath, GstValidateMediaDescriptor.MEDIA_INFO_EXT)
args = G_V_DISCOVERER_COMMAND.split(" ") args = G_V_DISCOVERER_COMMAND.split(" ")
args.append(uri) args.append(uri)
if os.path.isfile(media_info): if os.path.isfile(media_info):
self._check_discovering_info(media_info, uri) self._check_discovering_info(media_info, uri)
return True return True
elif fpath.endswith(G_V_STREAM_INFO_EXT): elif fpath.endswith(GstValidateMediaDescriptor.STREAM_INFO_EXT):
self._check_discovering_info(fpath) self._check_discovering_info(fpath)
return True return True
elif self.options.generate_info: elif self.options.generate_info:
@ -596,7 +528,7 @@ not been tested and explicitely activated if you set use --wanted-tests ALL""")
for f in files: for f in files:
fpath = os.path.join(path, root, f) fpath = os.path.join(path, root, f)
if os.path.isdir(fpath) or \ if os.path.isdir(fpath) or \
fpath.endswith(G_V_MEDIA_INFO_EXT) or\ fpath.endswith(GstValidateMediaDescriptor.MEDIA_INFO_EXT) or\
fpath.endswith(ScenarioManager.FILE_EXTENDION): fpath.endswith(ScenarioManager.FILE_EXTENDION):
continue continue
else: else:

View file

@ -31,8 +31,10 @@ import reporters
import ConfigParser import ConfigParser
from loggable import Loggable from loggable import Loggable
from optparse import OptionGroup from optparse import OptionGroup
import xml.etree.ElementTree as ET
from utils import mkdir, Result, Colors, printc, DEFAULT_TIMEOUT, GST_SECOND from utils import mkdir, Result, Colors, printc, DEFAULT_TIMEOUT, GST_SECOND, \
Protocols
class Test(Loggable): class Test(Loggable):
@ -869,3 +871,75 @@ class GstValidateBaseTestManager(TestsManager):
def get_encoding_formats(self): def get_encoding_formats(self):
return self._encoding_formats return self._encoding_formats
class GstValidateMediaDescriptor(Loggable):
# Some extension file for discovering results
MEDIA_INFO_EXT = "media_info"
STREAM_INFO_EXT = "stream_info"
def __init__(self, xml_path):
Loggable.__init__(self)
self._xml_path = xml_path
self.media_xml = ET.parse(xml_path).getroot()
# Sanity checks
self.media_xml.attrib["duration"]
self.media_xml.attrib["seekable"]
def get_media_filepath(self):
if self.get_protocol() == Protocols.FILE:
return self._xml_path.replace("." + self.MEDIA_INFO_EXT, "")
else:
return self._xml_path.replace("." + self.STREAM_INFO_EXT, "")
def get_caps(self):
return self.media_xml.findall("streams")[0].attrib["caps"]
def get_uri(self):
return self.media_xml.attrib["uri"]
def get_duration(self):
return long(self.media_xml.attrib["duration"])
def set_protocol(self, protocol):
self.media_xml.attrib["protocol"] = protocol
def get_protocol(self):
return self.media_xml.attrib["protocol"]
def is_seekable(self):
return self.media_xml.attrib["seekable"]
def is_image(self):
for stream in self.media_xml.findall("streams")[0].findall("stream"):
if stream.attrib["type"] == "image":
return True
return False
def get_num_tracks(self, track_type):
n = 0
for stream in self.media_xml.findall("streams")[0].findall("stream"):
if stream.attrib["type"] == track_type:
n += 1
return n
def is_compatible(self, scenario):
if scenario.seeks() and (not self.is_seekable() or self.is_image()):
self.debug("Do not run %s as %s does not support seeking",
scenario, self.get_uri())
return False
for track_type in ['audio', 'subtitle']:
if self.get_num_tracks(track_type) < scenario.get_min_tracks(track_type):
self.debug("%s -- %s | At least %s %s track needed < %s"
% (scenario, self.get_uri(), track_type,
scenario.get_min_tracks(track_type),
self.get_num_tracks(track_type)))
return False
return True