gstreamer/validate/tools/apps/gst-validate.py

212 lines
7.9 KiB
Python
Raw Normal View History

#!/usr/bin/python
#
# Copyright (c) 2013,Thibault Saunier <thibault.saunier@collabora.com>
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this program; if not, write to the
# Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
# Boston, MA 02110-1301, USA.
import os
import subprocess
import urlparse
import urllib
import ConfigParser
import logging
from testdefinitions import Test, TestsManager, DEFAULT_TIMEOUT
DEFAULT_GST_VALIDATE = "gst-validate-1.0"
DISCOVERER_COMMAND = ["gst-validate-media-check-1.0"]
MEDIA_INFO_EXT = "media_info"
STREAM_INFO = "stream_info"
SEEKING_REQUIERED_SCENARIO=["seek_forward", "seek_backward", "scrub_forward_seeking"]
SPECIAL_PROTOCOLS = [("application/x-hls", "hls")]
all_tests = {
"playback":
{
"pipelines": ["playbin uri=__uri__ audio_sink=autoaudiosink video_sink=autovideosink"],
"scenarios": ["none", "seek_forward", "seek_backward", "scrub_forward_seeking"]
},
}
def path2url(path):
return urlparse.urljoin('file:', urllib.pathname2url(path))
class GstValidateTest(Test):
def __init__(self,
classname,
options,
reporter,
pipeline_desc,
scenario,
file_infos=None,
timeout=DEFAULT_TIMEOUT):
if file_infos is not None:
timeout = file_infos.get("media-info", "file-duration")
super(GstValidateTest, self).__init__(DEFAULT_GST_VALIDATE, classname,
options, reporter, scenario,
timeout)
self.pipeline_desc = pipeline_desc
self.file_infos = file_infos
def build_arguments(self):
Test.build_arguments(self)
self.add_arguments(self.pipeline_desc)
class GstValidateManager(TestsManager):
name = "validate"
def __init__(self):
super(GstValidateManager, self).__init__()
self._uris = []
def add_options(self, group):
group.add_option("-c", "--check-discovering", dest="check_discovering",
default=False, action="store_true",
help="Whether to check discovering results using %s"
% DISCOVERER_COMMAND[0])
def list_tests(self):
for mainname, tests in all_tests.iteritems():
name = "validate.%s" % (mainname)
for pipe in tests["pipelines"]:
for scenario in tests["scenarios"]:
self._add_test(name, scenario, pipe)
def _check_discovering_info(self, media_info, uri=None):
logging.debug("Checking %s", media_info)
config = ConfigParser.ConfigParser()
f = open(media_info)
config.readfp(f)
try:
# Just testing that the vairous mandatory infos are present
caps = config.get("media-info", "caps")
config.get("media-info", "file-duration")
config.get("media-info", "seekable")
if uri is None:
uri = config.get("file-info", "uri")
config.set("file-info", "protocol", urlparse.urlparse(uri).scheme)
for caps2, prot in SPECIAL_PROTOCOLS:
if caps2 == caps:
config.set("file-info", "protocol", prot)
break
self._uris.append((uri, config))
except ConfigParser.NoOptionError as e:
logging.debug("Exception: %s for %s", e, media_info)
pass
f.close()
def _discover_file(self, uri, fpath):
try:
media_info = "%s.%s" % (fpath, MEDIA_INFO_EXT)
args = list(DISCOVERER_COMMAND)
args.append(uri)
if os.path.isfile(media_info):
if self.options.check_discovering is False:
self._check_discovering_info(media_info, uri)
return True
else:
args.extend(["--expected-results", media_info])
else:
args.extend(["--output-file", media_info])
subprocess.check_output(args)
self._check_discovering_info(media_info, uri)
return True
except subprocess.CalledProcessError:
logging.debug("Exception: %s", e)
return False
def _list_uris(self):
if self._uris:
return self._uris
if not self.args:
if isinstance(self.options.paths, str):
self.options.paths = [os.path.join(self.options.paths)]
for path in self.options.paths:
for root, dirs, files in os.walk(path):
for f in files:
fpath = os.path.join(path, root,f)
if os.path.isdir(fpath) or fpath.endswith(MEDIA_INFO_EXT):
continue
elif fpath.endswith(STREAM_INFO):
self._check_discovering_info(fpath)
else:
self._discover_file(path2url(fpath), fpath)
logging.debug("Uris found: %s", self._uris)
return self._uris
def _get_fname(self, name, scenario, protocol=None):
if protocol is not None:
name = "%s.%s" % (name, protocol)
if scenario is not None and scenario.lower() != "none":
return "%s.%s" % (name, scenario)
return name
def _add_test(self, name, scenario, pipe):
if self.options.mute:
if "autovideosink" in pipe:
pipe = pipe.replace("autovideosink", "fakesink")
if "autoaudiosink" in pipe:
pipe = pipe.replace("autoaudiosink", "fakesink")
if "__uri__" in pipe:
for uri, config in self._list_uris():
npipe = pipe
if scenario in SEEKING_REQUIERED_SCENARIO:
if config.getboolean("media-info", "seekable") is False:
logging.debug("Do not run %s as %s does not support seeking",
scenario, uri)
continue
if self.options.mute:
# In case of seeking we need to make sure the pipeline
# is run sync, otherwize some tests will fail
npipe = pipe.replace("fakesink", "'fakesink sync=true'")
fname = "%s.%s" % (self._get_fname(name, scenario,
config.get("file-info", "protocol")),
os.path.basename(uri).replace(".", "_"))
logging.debug("Adding: %s", fname)
self.tests.append(GstValidateTest(fname,
self.options,
self.reporter,
npipe.replace("__uri__", uri),
scenario,
config)
)
else:
logging.debug("Adding: %s", name)
self.tests.append(GstValidateTest(self._get_fname(fname, scenario),
self.options,
self.reporter,
pipe,
scenario))