mirror of
https://gitlab.freedesktop.org/gstreamer/gstreamer.git
synced 2024-11-27 12:11:13 +00:00
2921 lines
104 KiB
Python
2921 lines
104 KiB
Python
#!/usr/bin/env python3
|
|
#
|
|
# Copyright (c) 2013,Thibault Saunier <thibault.saunier@collabora.com>
|
|
#
|
|
# This program is free software; you can redistribute it and/or
|
|
# modify it under the terms of the GNU Lesser General Public
|
|
# License as published by the Free Software Foundation; either
|
|
# version 2.1 of the License, or (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
|
# Lesser General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU Lesser General Public
|
|
# License along with this program; if not, write to the
|
|
# Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
|
|
# Boston, MA 02110-1301, USA.
|
|
|
|
""" Class representing tests and test managers. """
|
|
|
|
from enum import Enum
|
|
import importlib.util
|
|
import json
|
|
import os
|
|
import sys
|
|
import re
|
|
import copy
|
|
import shlex
|
|
import socketserver
|
|
import struct
|
|
import time
|
|
from . import utils
|
|
import signal
|
|
import urllib.parse
|
|
import subprocess
|
|
import threading
|
|
import queue
|
|
import configparser
|
|
import xml
|
|
import random
|
|
import shutil
|
|
import uuid
|
|
from itertools import cycle
|
|
from fractions import Fraction
|
|
|
|
from .utils import GstCaps, which
|
|
from . import reporters
|
|
from . import loggable
|
|
from .loggable import Loggable
|
|
|
|
from collections import defaultdict
|
|
try:
|
|
from lxml import etree as ET
|
|
except ImportError:
|
|
import xml.etree.cElementTree as ET
|
|
|
|
|
|
from .vfb_server import get_virual_frame_buffer_server
|
|
from .httpserver import HTTPServer
|
|
from .utils import mkdir, Result, Colors, printc, DEFAULT_TIMEOUT, GST_SECOND, \
|
|
Protocols, look_for_file_in_source_dir, get_data_file, BackTraceGenerator, \
|
|
check_bugs_resolution, is_tty
|
|
|
|
# The factor by which we increase the hard timeout when running inside
|
|
# Valgrind
|
|
GDB_TIMEOUT_FACTOR = VALGRIND_TIMEOUT_FACTOR = 20
|
|
RR_TIMEOUT_FACTOR = 2
|
|
TIMEOUT_FACTOR = float(os.environ.get("TIMEOUT_FACTOR", 1))
|
|
# The error reported by valgrind when detecting errors
|
|
VALGRIND_ERROR_CODE = 20
|
|
|
|
VALIDATE_OVERRIDE_EXTENSION = ".override"
|
|
EXITING_SIGNALS = dict([(-getattr(signal, s), s) for s in [
|
|
'SIGQUIT', 'SIGILL', 'SIGABRT', 'SIGFPE', 'SIGSEGV', 'SIGBUS', 'SIGSYS',
|
|
'SIGTRAP', 'SIGXCPU', 'SIGXFSZ', 'SIGIOT'] if hasattr(signal, s)])
|
|
EXITING_SIGNALS.update({139: "SIGSEGV"})
|
|
EXITING_SIGNALS.update({(v, k) for k, v in EXITING_SIGNALS.items()})
|
|
|
|
|
|
CI_ARTIFACTS_URL = os.environ.get('CI_ARTIFACTS_URL')
|
|
|
|
|
|
class Test(Loggable):
|
|
|
|
""" A class representing a particular test. """
|
|
|
|
def __init__(self, application_name, classname, options,
|
|
reporter, duration=0, timeout=DEFAULT_TIMEOUT,
|
|
hard_timeout=None, extra_env_variables=None,
|
|
expected_issues=None, is_parallel=True,
|
|
workdir=None):
|
|
"""
|
|
@timeout: The timeout during which the value return by get_current_value
|
|
keeps being exactly equal
|
|
@hard_timeout: Max time the test can take in absolute
|
|
"""
|
|
Loggable.__init__(self)
|
|
self.timeout = timeout * TIMEOUT_FACTOR * options.timeout_factor
|
|
if hard_timeout:
|
|
self.hard_timeout = hard_timeout * TIMEOUT_FACTOR
|
|
self.hard_timeout *= options.timeout_factor
|
|
else:
|
|
self.hard_timeout = hard_timeout
|
|
self.classname = classname
|
|
self.options = options
|
|
self.application = application_name
|
|
self.command = []
|
|
self.server_command = None
|
|
self.reporter = reporter
|
|
self.process = None
|
|
self.proc_env = None
|
|
self.thread = None
|
|
self.queue = None
|
|
self.duration = duration
|
|
self.stack_trace = None
|
|
self._uuid = None
|
|
if expected_issues is None:
|
|
self.expected_issues = []
|
|
elif not isinstance(expected_issues, list):
|
|
self.expected_issues = [expected_issues]
|
|
else:
|
|
self.expected_issues = expected_issues
|
|
|
|
extra_env_variables = extra_env_variables or {}
|
|
self.extra_env_variables = extra_env_variables
|
|
self.optional = False
|
|
self.is_parallel = is_parallel
|
|
self.generator = None
|
|
self.workdir = workdir
|
|
self.max_retries = 0
|
|
self.html_log = None
|
|
self.rr_logdir = None
|
|
|
|
self.clean()
|
|
|
|
def _generate_expected_issues(self):
|
|
return ''
|
|
|
|
def generate_expected_issues(self):
|
|
res = '%s"FIXME \'%s\' issues [REPORT A BUG ' % (" " * 4, self.classname) \
|
|
+ 'in https://gitlab.freedesktop.org/gstreamer/ '\
|
|
+ 'or use a proper bug description]": {'
|
|
res += """
|
|
"tests": [
|
|
"%s"
|
|
],
|
|
"issues": [""" % (self.classname)
|
|
|
|
retcode = self.process.returncode if self.process else 0
|
|
if retcode != 0:
|
|
signame = EXITING_SIGNALS.get(retcode)
|
|
val = "'" + signame + "'" if signame else retcode
|
|
res += """\n {
|
|
'%s': %s,
|
|
'sometimes': True,
|
|
},""" % ("signame" if signame else "returncode", val)
|
|
|
|
res += self._generate_expected_issues()
|
|
res += "\n%s],\n%s},\n" % (" " * 8, " " * 4)
|
|
|
|
return res
|
|
|
|
def copy(self, nth=None):
|
|
copied_test = copy.copy(self)
|
|
if nth:
|
|
copied_test.classname += '_it' + str(nth)
|
|
copied_test._uuid = None
|
|
copied_test.options = copy.copy(self.options)
|
|
copied_test.options.logsdir = os.path.join(copied_test.options.logsdir, str(nth))
|
|
os.makedirs(copied_test.options.logsdir, exist_ok=True)
|
|
|
|
return copied_test
|
|
|
|
def clean(self):
|
|
self.kill_subprocess()
|
|
self.message = ""
|
|
self.error_str = ""
|
|
self.time_taken = 0.0
|
|
self._starting_time = None
|
|
self.result = Result.NOT_RUN
|
|
self.logfile = None
|
|
self.out = None
|
|
self.extra_logfiles = set()
|
|
self.__env_variable = []
|
|
self.kill_subprocess()
|
|
self.process = None
|
|
|
|
def __str__(self):
|
|
string = self.classname
|
|
if self.result != Result.NOT_RUN:
|
|
string += ": " + self.result
|
|
if self.result in [Result.FAILED, Result.TIMEOUT]:
|
|
string += " '%s'" % self.message
|
|
if not self.options.dump_on_failure:
|
|
if not self.options.redirect_logs and self.result != Result.PASSED:
|
|
string += self.get_logfile_repr()
|
|
else:
|
|
string = "\n==> %s" % string
|
|
|
|
return string
|
|
|
|
def add_env_variable(self, variable, value=None):
|
|
"""
|
|
Only useful so that the gst-validate-launcher can print the exact
|
|
right command line to reproduce the tests
|
|
"""
|
|
if value is None:
|
|
value = os.environ.get(variable, None)
|
|
|
|
if value is None:
|
|
return
|
|
|
|
self.__env_variable.append(variable)
|
|
|
|
@property
|
|
def _env_variable(self):
|
|
res = ""
|
|
if not self.options.verbose or self.options.verbose > 1:
|
|
for var in set(self.__env_variable):
|
|
if res:
|
|
res += " "
|
|
value = self.proc_env.get(var, None)
|
|
if value is not None:
|
|
res += "%s='%s'" % (var, value)
|
|
else:
|
|
res += "[Not displaying environment variables, rerun with -vv for the full command]"
|
|
|
|
return res
|
|
|
|
def open_logfile(self):
|
|
if self.out:
|
|
return
|
|
|
|
path = os.path.join(self.options.logsdir,
|
|
self.classname.replace(".", os.sep) + '.md')
|
|
mkdir(os.path.dirname(path))
|
|
self.logfile = path
|
|
|
|
if self.options.redirect_logs == 'stdout':
|
|
self.out = sys.stdout
|
|
elif self.options.redirect_logs == 'stderr':
|
|
self.out = sys.stderr
|
|
else:
|
|
self.out = open(path, 'w+')
|
|
|
|
def finalize_logfiles(self):
|
|
self.out.write("\n**Duration**: %s" % self.time_taken)
|
|
if not self.options.redirect_logs:
|
|
self.out.flush()
|
|
for logfile in self.extra_logfiles:
|
|
# Only copy over extra logfile content if it's below a certain threshold
|
|
# Avoid copying gigabytes of data if a lot of debugging is activated
|
|
if os.path.getsize(logfile) < 500 * 1024:
|
|
self.out.write('\n\n## %s:\n\n```\n%s\n```\n' % (
|
|
os.path.basename(logfile), self.get_extra_log_content(logfile))
|
|
)
|
|
else:
|
|
self.out.write('\n\n## %s:\n\n**Log file too big.**\n %s\n\n Check file content directly\n\n' % (
|
|
os.path.basename(logfile), logfile)
|
|
)
|
|
|
|
if self.rr_logdir:
|
|
self.out.write('\n\n## rr trace:\n\n```\nrr replay %s/latest-trace\n```\n' % (
|
|
self.rr_logdir))
|
|
|
|
self.out.flush()
|
|
self.out.close()
|
|
|
|
if self.options.html:
|
|
self.html_log = os.path.splitext(self.logfile)[0] + '.html'
|
|
import commonmark
|
|
parser = commonmark.Parser()
|
|
with open(self.logfile) as f:
|
|
ast = parser.parse(f.read())
|
|
|
|
renderer = commonmark.HtmlRenderer()
|
|
html = renderer.render(ast)
|
|
with open(self.html_log, 'w') as f:
|
|
f.write(html)
|
|
|
|
self.out = None
|
|
|
|
def _get_file_content(self, file_name):
|
|
f = open(file_name, 'r+')
|
|
value = f.read()
|
|
f.close()
|
|
|
|
return value
|
|
|
|
def get_log_content(self):
|
|
return self._get_file_content(self.logfile)
|
|
|
|
def get_extra_log_content(self, extralog):
|
|
if extralog not in self.extra_logfiles:
|
|
return ""
|
|
|
|
return self._get_file_content(extralog)
|
|
|
|
def get_classname(self):
|
|
name = self.classname.split('.')[-1]
|
|
classname = self.classname.replace('.%s' % name, '')
|
|
|
|
return classname
|
|
|
|
def get_name(self):
|
|
return self.classname.split('.')[-1]
|
|
|
|
def get_uuid(self):
|
|
if self._uuid is None:
|
|
self._uuid = self.classname + str(uuid.uuid4())
|
|
return self._uuid
|
|
|
|
def add_arguments(self, *args):
|
|
self.command += args
|
|
|
|
def build_arguments(self):
|
|
self.add_env_variable("LD_PRELOAD")
|
|
self.add_env_variable("DISPLAY")
|
|
|
|
def add_stack_trace_to_logfile(self):
|
|
self.debug("Adding stack trace")
|
|
if self.options.rr:
|
|
return
|
|
|
|
trace_gatherer = BackTraceGenerator.get_default()
|
|
stack_trace = trace_gatherer.get_trace(self)
|
|
|
|
if not stack_trace:
|
|
return
|
|
|
|
info = "\n\n## Stack trace\n\n```\n%s\n```" % stack_trace
|
|
if self.options.redirect_logs:
|
|
print(info)
|
|
return
|
|
|
|
if self.options.xunit_file:
|
|
self.stack_trace = stack_trace
|
|
|
|
self.out.write(info)
|
|
self.out.flush()
|
|
|
|
def add_known_issue_information(self):
|
|
if self.expected_issues:
|
|
info = "\n\n## Already known issues\n\n``` python\n%s\n```\n\n" % (
|
|
json.dumps(self.expected_issues, indent=4)
|
|
)
|
|
else:
|
|
info = ""
|
|
|
|
info += "\n\n**You can mark the issues as 'known' by adding the " \
|
|
+ " following lines to the list of known issues**\n" \
|
|
+ "\n\n``` python\n%s\n```" % (self.generate_expected_issues())
|
|
|
|
if self.options.redirect_logs:
|
|
print(info)
|
|
return
|
|
|
|
self.out.write(info)
|
|
|
|
def set_result(self, result, message="", error=""):
|
|
|
|
if not self.options.redirect_logs:
|
|
self.out.write("\n```\n")
|
|
self.out.flush()
|
|
|
|
self.debug("Setting result: %s (message: %s, error: %s)" % (result,
|
|
message, error))
|
|
|
|
if result is Result.TIMEOUT:
|
|
if self.options.debug is True:
|
|
if self.options.gdb:
|
|
printc("Timeout, you should process <ctrl>c to get into gdb",
|
|
Colors.FAIL)
|
|
# and wait here until gdb exits
|
|
self.process.communicate()
|
|
else:
|
|
pname = self.command[0]
|
|
input("%sTimeout happened on %s you can attach gdb doing:\n $gdb %s %d%s\n"
|
|
"Press enter to continue" % (Colors.FAIL, self.classname,
|
|
pname, self.process.pid, Colors.ENDC))
|
|
else:
|
|
self.add_stack_trace_to_logfile()
|
|
|
|
self.result = result
|
|
self.message = message
|
|
self.error_str = error
|
|
|
|
if result not in [Result.PASSED, Result.NOT_RUN, Result.SKIPPED]:
|
|
self.add_known_issue_information()
|
|
|
|
def check_results(self):
|
|
if self.result is Result.FAILED or self.result is Result.TIMEOUT:
|
|
return
|
|
|
|
self.debug("%s returncode: %s", self, self.process.returncode)
|
|
if self.options.rr and self.process.returncode == -signal.SIGPIPE:
|
|
self.set_result(Result.SKIPPED, "SIGPIPE received under `rr`, known issue.")
|
|
elif self.process.returncode == 0:
|
|
self.set_result(Result.PASSED)
|
|
elif self.process.returncode in EXITING_SIGNALS:
|
|
self.add_stack_trace_to_logfile()
|
|
self.set_result(Result.FAILED,
|
|
"Application exited with signal %s" % (
|
|
EXITING_SIGNALS[self.process.returncode]))
|
|
elif self.process.returncode == VALGRIND_ERROR_CODE:
|
|
self.set_result(Result.FAILED, "Valgrind reported errors")
|
|
else:
|
|
self.set_result(Result.FAILED,
|
|
"Application returned %d" % (self.process.returncode))
|
|
|
|
def get_current_value(self):
|
|
"""
|
|
Lets subclasses implement a nicer timeout measurement method
|
|
They should return some value with which we will compare
|
|
the previous and timeout if they are egual during self.timeout
|
|
seconds
|
|
"""
|
|
return Result.NOT_RUN
|
|
|
|
def process_update(self):
|
|
"""
|
|
Returns True when process has finished running or has timed out.
|
|
"""
|
|
|
|
if self.process is None:
|
|
# Process has not started running yet
|
|
return False
|
|
|
|
self.process.poll()
|
|
if self.process.returncode is not None:
|
|
return True
|
|
|
|
val = self.get_current_value()
|
|
|
|
self.debug("Got value: %s" % val)
|
|
if val is Result.NOT_RUN:
|
|
# The get_current_value logic is not implemented... dumb
|
|
# timeout
|
|
if time.time() - self.last_change_ts > self.timeout:
|
|
self.set_result(Result.TIMEOUT,
|
|
"Application timed out: %s secs" %
|
|
self.timeout,
|
|
"timeout")
|
|
return True
|
|
return False
|
|
elif val is Result.FAILED:
|
|
return True
|
|
elif val is Result.KNOWN_ERROR:
|
|
return True
|
|
|
|
self.log("New val %s" % val)
|
|
|
|
if val == self.last_val:
|
|
delta = time.time() - self.last_change_ts
|
|
self.debug("%s: Same value for %d/%d seconds" %
|
|
(self, delta, self.timeout))
|
|
if delta > self.timeout:
|
|
self.set_result(Result.TIMEOUT,
|
|
"Application timed out: %s secs" %
|
|
self.timeout,
|
|
"timeout")
|
|
return True
|
|
elif self.hard_timeout and time.time() - self.start_ts > self.hard_timeout:
|
|
self.set_result(
|
|
Result.TIMEOUT, "Hard timeout reached: %d secs" % self.hard_timeout)
|
|
return True
|
|
else:
|
|
self.last_change_ts = time.time()
|
|
self.last_val = val
|
|
|
|
return False
|
|
|
|
def get_subproc_env(self):
|
|
return os.environ.copy()
|
|
|
|
def kill_subprocess(self):
|
|
subprocs_id = None
|
|
if self.options.rr and self.process and self.process.returncode is None:
|
|
cmd = ["ps", "-o", "pid", "--ppid", str(self.process.pid), "--noheaders"]
|
|
try:
|
|
subprocs_id = [int(pid.strip('\n')) for
|
|
pid in subprocess.check_output(cmd).decode().split(' ') if pid]
|
|
except FileNotFoundError:
|
|
self.error("Ps not found, will probably not be able to get rr "
|
|
"working properly after we kill the process")
|
|
except subprocess.CalledProcessError as e:
|
|
self.error("Couldn't get rr subprocess pid: %s" % (e))
|
|
|
|
utils.kill_subprocess(self, self.process, DEFAULT_TIMEOUT, subprocs_id)
|
|
|
|
def run_external_checks(self):
|
|
pass
|
|
|
|
def thread_wrapper(self):
|
|
def enable_sigint():
|
|
# Restore the SIGINT handler for the child process (gdb) to ensure
|
|
# it can handle it.
|
|
signal.signal(signal.SIGINT, signal.SIG_DFL)
|
|
|
|
if self.options.gdb and os.name != "nt":
|
|
preexec_fn = enable_sigint
|
|
else:
|
|
preexec_fn = None
|
|
|
|
self.process = subprocess.Popen(self.command,
|
|
stderr=self.out,
|
|
stdout=self.out,
|
|
env=self.proc_env,
|
|
cwd=self.workdir,
|
|
preexec_fn=preexec_fn)
|
|
self.process.wait()
|
|
if self.result is not Result.TIMEOUT:
|
|
if self.process.returncode == 0:
|
|
self.run_external_checks()
|
|
self.queue.put(None)
|
|
|
|
def get_valgrind_suppression_file(self, subdir, name):
|
|
p = get_data_file(subdir, name)
|
|
if p:
|
|
return p
|
|
|
|
self.error("Could not find any %s file" % name)
|
|
|
|
def get_valgrind_suppressions(self):
|
|
return [self.get_valgrind_suppression_file('data', 'gstvalidate.supp')]
|
|
|
|
def use_gdb(self, command):
|
|
if self.hard_timeout is not None:
|
|
self.hard_timeout *= GDB_TIMEOUT_FACTOR
|
|
self.timeout *= GDB_TIMEOUT_FACTOR
|
|
|
|
if not self.options.gdb_non_stop:
|
|
self.timeout = sys.maxsize
|
|
self.hard_timeout = sys.maxsize
|
|
|
|
args = ["gdb"]
|
|
if self.options.gdb_non_stop:
|
|
args += ["-ex", "run", "-ex", "backtrace", "-ex", "quit"]
|
|
args += ["--args"] + command
|
|
return args
|
|
|
|
def use_rr(self, command, subenv):
|
|
command = ["rr", 'record', '-h'] + command
|
|
|
|
self.timeout *= RR_TIMEOUT_FACTOR
|
|
self.rr_logdir = os.path.join(self.options.logsdir, self.classname.replace(".", os.sep), 'rr-logs')
|
|
subenv['_RR_TRACE_DIR'] = self.rr_logdir
|
|
try:
|
|
shutil.rmtree(self.rr_logdir, ignore_errors=False, onerror=None)
|
|
except FileNotFoundError:
|
|
pass
|
|
self.add_env_variable('_RR_TRACE_DIR', self.rr_logdir)
|
|
|
|
return command
|
|
|
|
def use_valgrind(self, command, subenv):
|
|
vglogsfile = os.path.splitext(self.logfile)[0] + '.valgrind'
|
|
self.extra_logfiles.add(vglogsfile)
|
|
|
|
vg_args = []
|
|
|
|
for o, v in [('trace-children', 'yes'),
|
|
('tool', 'memcheck'),
|
|
('leak-check', 'full'),
|
|
('leak-resolution', 'high'),
|
|
# TODO: errors-for-leak-kinds should be set to all instead of definite
|
|
# and all false positives should be added to suppression
|
|
# files.
|
|
('errors-for-leak-kinds', 'definite,indirect'),
|
|
('show-leak-kinds', 'definite,indirect'),
|
|
('show-possibly-lost', 'no'),
|
|
('num-callers', '20'),
|
|
('error-exitcode', str(VALGRIND_ERROR_CODE)),
|
|
('gen-suppressions', 'all')]:
|
|
vg_args.append("--%s=%s" % (o, v))
|
|
|
|
if not self.options.redirect_logs:
|
|
vglogsfile = os.path.splitext(self.logfile)[0] + '.valgrind'
|
|
self.extra_logfiles.add(vglogsfile)
|
|
vg_args.append("--%s=%s" % ('log-file', vglogsfile))
|
|
|
|
for supp in self.get_valgrind_suppressions():
|
|
vg_args.append("--suppressions=%s" % supp)
|
|
|
|
command = ["valgrind"] + vg_args + command
|
|
|
|
# Tune GLib's memory allocator to be more valgrind friendly
|
|
subenv['G_DEBUG'] = 'gc-friendly'
|
|
subenv['G_SLICE'] = 'always-malloc'
|
|
|
|
if self.hard_timeout is not None:
|
|
self.hard_timeout *= VALGRIND_TIMEOUT_FACTOR
|
|
self.timeout *= VALGRIND_TIMEOUT_FACTOR
|
|
|
|
# Enable 'valgrind.config'
|
|
self.add_validate_config(get_data_file(
|
|
'data', 'valgrind.config'), subenv)
|
|
if subenv == self.proc_env:
|
|
self.add_env_variable('G_DEBUG', 'gc-friendly')
|
|
self.add_env_variable('G_SLICE', 'always-malloc')
|
|
self.add_env_variable('GST_VALIDATE_CONFIG',
|
|
self.proc_env['GST_VALIDATE_CONFIG'])
|
|
|
|
return command
|
|
|
|
def add_validate_config(self, config, subenv=None):
|
|
if not subenv:
|
|
subenv = self.extra_env_variables
|
|
|
|
cconf = subenv.get('GST_VALIDATE_CONFIG', "")
|
|
paths = [c for c in cconf.split(os.pathsep) if c] + [config]
|
|
subenv['GST_VALIDATE_CONFIG'] = os.pathsep.join(paths)
|
|
|
|
def launch_server(self):
|
|
return None
|
|
|
|
def get_logfile_repr(self):
|
|
if not self.options.redirect_logs:
|
|
if self.html_log:
|
|
log = self.html_log
|
|
else:
|
|
log = self.logfile
|
|
|
|
if CI_ARTIFACTS_URL:
|
|
log = CI_ARTIFACTS_URL + os.path.relpath(log, self.options.logsdir)
|
|
|
|
return "\n Log: %s" % (log)
|
|
|
|
return ""
|
|
|
|
def get_command_repr(self):
|
|
message = "%s %s" % (self._env_variable, ' '.join(
|
|
shlex.quote(arg) for arg in self.command))
|
|
if self.server_command:
|
|
message = "%s & %s" % (self.server_command, message)
|
|
|
|
return message
|
|
|
|
def test_start(self, queue):
|
|
self.open_logfile()
|
|
|
|
self.server_command = self.launch_server()
|
|
self.queue = queue
|
|
self.command = [self.application]
|
|
self._starting_time = time.time()
|
|
self.build_arguments()
|
|
self.proc_env = self.get_subproc_env()
|
|
|
|
for var, value in list(self.extra_env_variables.items()):
|
|
value = self.proc_env.get(var, '') + os.pathsep + value
|
|
self.proc_env[var] = value.strip(os.pathsep)
|
|
self.add_env_variable(var, self.proc_env[var])
|
|
|
|
if self.options.gdb:
|
|
self.command = self.use_gdb(self.command)
|
|
|
|
self.previous_sigint_handler = signal.getsignal(signal.SIGINT)
|
|
# Make the gst-validate executable ignore SIGINT while gdb is
|
|
# running.
|
|
signal.signal(signal.SIGINT, signal.SIG_IGN)
|
|
|
|
if self.options.valgrind:
|
|
self.command = self.use_valgrind(self.command, self.proc_env)
|
|
|
|
if self.options.rr:
|
|
self.command = self.use_rr(self.command, self.proc_env)
|
|
|
|
if not self.options.redirect_logs:
|
|
self.out.write("# `%s`\n\n"
|
|
"## Command\n\n``` bash\n%s\n```\n\n" % (
|
|
self.classname, self.get_command_repr()))
|
|
self.out.write("## %s output\n\n``` \n\n" % os.path.basename(self.application))
|
|
self.out.flush()
|
|
else:
|
|
message = "Launching: %s%s\n" \
|
|
" Command: %s\n" % (Colors.ENDC, self.classname,
|
|
self.get_command_repr())
|
|
printc(message, Colors.OKBLUE)
|
|
|
|
self.thread = threading.Thread(target=self.thread_wrapper)
|
|
self.thread.start()
|
|
|
|
self.last_val = 0
|
|
self.last_change_ts = time.time()
|
|
self.start_ts = time.time()
|
|
|
|
def _dump_log_file(self, logfile):
|
|
if which('bat'):
|
|
try:
|
|
subprocess.check_call(['bat', '-H', '1', '--paging=never', logfile])
|
|
return
|
|
except (subprocess.CalledProcessError, FileNotFoundError):
|
|
pass
|
|
|
|
with open(logfile, 'r') as fin:
|
|
for line in fin.readlines():
|
|
print('> ' + line, end='')
|
|
|
|
def _dump_log_files(self):
|
|
self._dump_log_file(self.logfile)
|
|
|
|
def copy_logfiles(self, extra_folder="flaky_tests"):
|
|
path = os.path.dirname(os.path.join(self.options.logsdir, extra_folder,
|
|
self.classname.replace(".", os.sep)))
|
|
mkdir(path)
|
|
self.logfile = shutil.copy(self.logfile, path)
|
|
extra_logs = []
|
|
for logfile in self.extra_logfiles:
|
|
extra_logs.append(shutil.copy(logfile, path))
|
|
self.extra_logfiles = extra_logs
|
|
|
|
def test_end(self, retry_on_failures=False):
|
|
self.kill_subprocess()
|
|
self.thread.join()
|
|
self.time_taken = time.time() - self._starting_time
|
|
|
|
if self.options.gdb:
|
|
signal.signal(signal.SIGINT, self.previous_sigint_handler)
|
|
|
|
self.finalize_logfiles()
|
|
if self.options.dump_on_failure and not retry_on_failures and not self.max_retries:
|
|
if self.result not in [Result.PASSED, Result.KNOWN_ERROR, Result.NOT_RUN]:
|
|
self._dump_log_files()
|
|
|
|
# Only keep around env variables we need later
|
|
clean_env = {}
|
|
for n in self.__env_variable:
|
|
clean_env[n] = self.proc_env.get(n, None)
|
|
self.proc_env = clean_env
|
|
|
|
# Don't keep around JSON report objects, they were processed
|
|
# in check_results already
|
|
self.reports = []
|
|
|
|
return self.result
|
|
|
|
|
|
class GstValidateTCPServer(socketserver.ThreadingMixIn, socketserver.TCPServer):
|
|
pass
|
|
|
|
|
|
class GstValidateListener(socketserver.BaseRequestHandler, Loggable):
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
super().__init__(*args, **kwargs)
|
|
Loggable.__init__(self, "GstValidateListener")
|
|
|
|
def handle(self):
|
|
"""Implements BaseRequestHandler handle method"""
|
|
test = None
|
|
self.logCategory = "GstValidateListener"
|
|
while True:
|
|
raw_len = self.request.recv(4)
|
|
if raw_len == b'':
|
|
return
|
|
msglen = struct.unpack('>I', raw_len)[0]
|
|
e = None
|
|
raw_msg = bytes()
|
|
while msglen != len(raw_msg):
|
|
raw_msg += self.request.recv(msglen - len(raw_msg))
|
|
if e is not None:
|
|
continue
|
|
try:
|
|
msg = raw_msg.decode('utf-8', 'ignore')
|
|
except UnicodeDecodeError as e:
|
|
self.error("%s Could not decode message: %s - %s" % (test.classname if test else "unknown", msg, e))
|
|
continue
|
|
|
|
if msg == '':
|
|
return
|
|
|
|
try:
|
|
obj = json.loads(msg)
|
|
except json.decoder.JSONDecodeError as e:
|
|
self.error("%s Could not decode message: %s - %s" % (test.classname if test else "unknown", msg, e))
|
|
continue
|
|
|
|
if test is None:
|
|
# First message must contain the uuid
|
|
uuid = obj.get("uuid", None)
|
|
if uuid is None:
|
|
return
|
|
# Find test from launcher
|
|
for t in self.server.launcher.tests:
|
|
if uuid == t.get_uuid():
|
|
test = t
|
|
break
|
|
if test is None:
|
|
self.server.launcher.error(
|
|
"Could not find test for UUID %s" % uuid)
|
|
return
|
|
|
|
obj_type = obj.get("type", '')
|
|
if obj_type == 'position':
|
|
test.set_position(obj['position'], obj['duration'],
|
|
obj['speed'])
|
|
elif obj_type == 'buffering':
|
|
test.set_position(obj['position'], 100)
|
|
elif obj_type == 'action':
|
|
test.add_action_execution(obj)
|
|
# Make sure that action is taken into account when checking if process
|
|
# is updating
|
|
test.position += 1
|
|
elif obj_type == 'action-done':
|
|
# Make sure that action end is taken into account when checking if process
|
|
# is updating
|
|
test.position += 1
|
|
if test.actions_infos:
|
|
test.actions_infos[-1]['execution-duration'] = obj['execution-duration']
|
|
elif obj_type == 'report':
|
|
test.add_report(obj)
|
|
elif obj_type == 'skip-test':
|
|
test.set_result(Result.SKIPPED)
|
|
|
|
|
|
class GstValidateTest(Test):
|
|
|
|
""" A class representing a particular test. """
|
|
HARD_TIMEOUT_FACTOR = 5
|
|
fault_sig_regex = re.compile("<Caught SIGNAL: .*>")
|
|
needs_gst_inspect = set()
|
|
|
|
def __init__(self, application_name, classname,
|
|
options, reporter, duration=0,
|
|
timeout=DEFAULT_TIMEOUT, scenario=None, hard_timeout=None,
|
|
media_descriptor=None, extra_env_variables=None,
|
|
expected_issues=None, workdir=None, **kwargs):
|
|
|
|
extra_env_variables = extra_env_variables or {}
|
|
|
|
if not hard_timeout and self.HARD_TIMEOUT_FACTOR:
|
|
if timeout:
|
|
hard_timeout = timeout * self.HARD_TIMEOUT_FACTOR
|
|
elif duration:
|
|
hard_timeout = duration * self.HARD_TIMEOUT_FACTOR
|
|
else:
|
|
hard_timeout = None
|
|
|
|
# If we are running from source, use the -debug version of the
|
|
# application which is using rpath instead of libtool's wrappers. It's
|
|
# slightly faster to start and will not confuse valgrind.
|
|
debug = '%s-debug' % application_name
|
|
p = look_for_file_in_source_dir('tools', debug)
|
|
if p:
|
|
application_name = p
|
|
|
|
self.reports = []
|
|
self.position = -1
|
|
self.media_duration = -1
|
|
self.speed = 1.0
|
|
self.actions_infos = []
|
|
self.media_descriptor = media_descriptor
|
|
self.server = None
|
|
self.criticals = []
|
|
|
|
override_path = self.get_override_file(media_descriptor)
|
|
if override_path:
|
|
if extra_env_variables:
|
|
if extra_env_variables.get("GST_VALIDATE_OVERRIDE", ""):
|
|
extra_env_variables[
|
|
"GST_VALIDATE_OVERRIDE"] += os.path.pathsep
|
|
|
|
extra_env_variables["GST_VALIDATE_OVERRIDE"] = override_path
|
|
|
|
super().__init__(application_name,
|
|
classname,
|
|
options, reporter,
|
|
duration=duration,
|
|
timeout=timeout,
|
|
hard_timeout=hard_timeout,
|
|
extra_env_variables=extra_env_variables,
|
|
expected_issues=expected_issues,
|
|
workdir=workdir,
|
|
**kwargs)
|
|
if media_descriptor and media_descriptor.get_media_filepath():
|
|
config_file = os.path.join(media_descriptor.get_media_filepath() + '.config')
|
|
if os.path.isfile(config_file):
|
|
self.add_validate_config(config_file, extra_env_variables)
|
|
|
|
if scenario is None or scenario.name.lower() == "none":
|
|
self.scenario = None
|
|
else:
|
|
self.scenario = scenario
|
|
|
|
def needs_http_server(self):
|
|
if self.media_descriptor is None:
|
|
return False
|
|
|
|
protocol = self.media_descriptor.get_protocol()
|
|
uri = self.media_descriptor.get_uri()
|
|
uri_requires_http_server = False
|
|
if uri:
|
|
if 'http-server-port' in uri:
|
|
expanded_uri = uri % {
|
|
'http-server-port': self.options.http_server_port}
|
|
uri_requires_http_server = expanded_uri.find(
|
|
"127.0.0.1:%s" % self.options.http_server_port) != -1
|
|
if protocol in [Protocols.HTTP, Protocols.HLS, Protocols.DASH] or uri_requires_http_server:
|
|
return True
|
|
|
|
return False
|
|
|
|
def kill_subprocess(self):
|
|
Test.kill_subprocess(self)
|
|
|
|
def add_report(self, report):
|
|
self.reports.append(report)
|
|
|
|
def set_position(self, position, duration, speed=None):
|
|
self.position = position
|
|
self.media_duration = duration
|
|
if speed:
|
|
self.speed = speed
|
|
|
|
def add_action_execution(self, action_infos):
|
|
self.actions_infos.append(action_infos)
|
|
|
|
def get_override_file(self, media_descriptor):
|
|
if media_descriptor:
|
|
if media_descriptor.get_path():
|
|
override_path = os.path.splitext(media_descriptor.get_path())[
|
|
0] + VALIDATE_OVERRIDE_EXTENSION
|
|
if os.path.exists(override_path):
|
|
return override_path
|
|
|
|
return None
|
|
|
|
def get_current_position(self):
|
|
return self.position
|
|
|
|
def get_current_value(self):
|
|
return self.position
|
|
|
|
def get_subproc_env(self):
|
|
subproc_env = os.environ.copy()
|
|
|
|
if self.options.validate_default_config:
|
|
self.add_validate_config(self.options.validate_default_config,
|
|
subproc_env, )
|
|
|
|
subproc_env["GST_VALIDATE_UUID"] = self.get_uuid()
|
|
subproc_env["GST_VALIDATE_LOGSDIR"] = self.options.logsdir
|
|
|
|
if 'GST_DEBUG' in os.environ and not self.options.redirect_logs:
|
|
gstlogsfile = os.path.splitext(self.logfile)[0] + '.gstdebug'
|
|
self.extra_logfiles.add(gstlogsfile)
|
|
subproc_env["GST_DEBUG_FILE"] = gstlogsfile
|
|
|
|
if self.options.no_color:
|
|
subproc_env["GST_DEBUG_NO_COLOR"] = '1'
|
|
|
|
# Ensure XInitThreads is called, see bgo#731525
|
|
subproc_env['GST_GL_XINITTHREADS'] = '1'
|
|
self.add_env_variable('GST_GL_XINITTHREADS', '1')
|
|
subproc_env['GST_XINITTHREADS'] = '1'
|
|
self.add_env_variable('GST_XINITTHREADS', '1')
|
|
|
|
if self.scenario is not None:
|
|
scenario = self.scenario.get_execution_name()
|
|
subproc_env["GST_VALIDATE_SCENARIO"] = scenario
|
|
self.add_env_variable("GST_VALIDATE_SCENARIO",
|
|
subproc_env["GST_VALIDATE_SCENARIO"])
|
|
else:
|
|
try:
|
|
del subproc_env["GST_VALIDATE_SCENARIO"]
|
|
except KeyError:
|
|
pass
|
|
|
|
if not subproc_env.get('GST_DEBUG_DUMP_DOT_DIR'):
|
|
dotfilesdir = os.path.join(self.options.logsdir,
|
|
self.classname.replace(".", os.sep) + '.pipelines_dot_files')
|
|
mkdir(dotfilesdir)
|
|
subproc_env['GST_DEBUG_DUMP_DOT_DIR'] = dotfilesdir
|
|
if CI_ARTIFACTS_URL:
|
|
dotfilesurl = CI_ARTIFACTS_URL + os.path.relpath(dotfilesdir,
|
|
self.options.logsdir)
|
|
subproc_env['GST_VALIDATE_DEBUG_DUMP_DOT_URL'] = dotfilesurl
|
|
|
|
return subproc_env
|
|
|
|
def clean(self):
|
|
Test.clean(self)
|
|
self.reports = []
|
|
self.position = -1
|
|
self.media_duration = -1
|
|
self.speed = 1.0
|
|
self.actions_infos = []
|
|
|
|
def build_arguments(self):
|
|
super(GstValidateTest, self).build_arguments()
|
|
if "GST_VALIDATE" in os.environ:
|
|
self.add_env_variable("GST_VALIDATE", os.environ["GST_VALIDATE"])
|
|
|
|
if "GST_VALIDATE_SCENARIOS_PATH" in os.environ:
|
|
self.add_env_variable("GST_VALIDATE_SCENARIOS_PATH",
|
|
os.environ["GST_VALIDATE_SCENARIOS_PATH"])
|
|
|
|
self.add_env_variable("GST_VALIDATE_CONFIG")
|
|
self.add_env_variable("GST_VALIDATE_OVERRIDE")
|
|
|
|
def get_extra_log_content(self, extralog):
|
|
value = Test.get_extra_log_content(self, extralog)
|
|
|
|
return value
|
|
|
|
def report_matches_expected_issues(self, report, expected_issue):
|
|
for key in ['bug', 'bugs', 'sometimes']:
|
|
if key in expected_issue:
|
|
del expected_issue[key]
|
|
for key, value in list(report.items()):
|
|
if key in expected_issue:
|
|
if not re.findall(expected_issue[key], str(value)):
|
|
return False
|
|
expected_issue.pop(key)
|
|
|
|
if "can-happen-several-times" in expected_issue:
|
|
expected_issue.pop("can-happen-several-times")
|
|
return not bool(expected_issue)
|
|
|
|
def check_reported_issues(self, expected_issues):
|
|
ret = []
|
|
expected_retcode = [0]
|
|
for report in self.reports:
|
|
found = None
|
|
for expected_issue in expected_issues:
|
|
if self.report_matches_expected_issues(report,
|
|
expected_issue.copy()):
|
|
found = expected_issue
|
|
break
|
|
|
|
if found is not None:
|
|
if not found.get('can-happen-several-times', False):
|
|
expected_issues.remove(found)
|
|
if report['level'] == 'critical':
|
|
if found.get('sometimes', True) and isinstance(expected_retcode, list):
|
|
expected_retcode.append(18)
|
|
else:
|
|
expected_retcode = [18]
|
|
elif report['level'] == 'critical':
|
|
ret.append(report)
|
|
|
|
if not ret:
|
|
return None, expected_issues, expected_retcode
|
|
|
|
return ret, expected_issues, expected_retcode
|
|
|
|
def check_expected_issue(self, expected_issue):
|
|
res = True
|
|
msg = ''
|
|
expected_symbols = expected_issue.get('stacktrace_symbols')
|
|
if expected_symbols:
|
|
trace_gatherer = BackTraceGenerator.get_default()
|
|
stack_trace = trace_gatherer.get_trace(self)
|
|
|
|
if stack_trace:
|
|
if not isinstance(expected_symbols, list):
|
|
expected_symbols = [expected_symbols]
|
|
|
|
not_found_symbols = [s for s in expected_symbols
|
|
if s not in stack_trace]
|
|
if not_found_symbols:
|
|
msg = " Expected symbols '%s' not found in stack trace " % (
|
|
not_found_symbols)
|
|
res = False
|
|
else:
|
|
msg += " No stack trace available, could not verify symbols "
|
|
|
|
_, not_found_expected_issues, _ = self.check_reported_issues(expected_issue.get('issues', []))
|
|
if not_found_expected_issues:
|
|
mandatory_failures = [f for f in not_found_expected_issues
|
|
if not f.get('sometimes', True)]
|
|
if mandatory_failures:
|
|
msg = " (Expected issues not found: %s) " % mandatory_failures
|
|
res = False
|
|
|
|
return msg, res
|
|
|
|
def check_expected_timeout(self, expected_timeout):
|
|
msg = "Expected timeout happened. "
|
|
result = Result.PASSED
|
|
message = expected_timeout.get('message')
|
|
if message:
|
|
if not re.findall(message, self.message):
|
|
result = Result.FAILED
|
|
msg = "Expected timeout message: %s got %s " % (
|
|
message, self.message)
|
|
|
|
stack_msg, stack_res = self.check_expected_issue(expected_timeout)
|
|
if not stack_res:
|
|
result = Result.TIMEOUT
|
|
msg += stack_msg
|
|
|
|
return result, msg
|
|
|
|
def check_results(self):
|
|
if self.result in [Result.FAILED, Result.PASSED, Result.SKIPPED]:
|
|
return
|
|
|
|
self.debug("%s returncode: %s", self, self.process.returncode)
|
|
expected_issues = copy.deepcopy(self.expected_issues)
|
|
if self.options.rr:
|
|
# signal.SIGPPIPE is 13 but it sometimes isn't present in python for some reason.
|
|
expected_issues.append({"returncode": -13, "sometimes": True})
|
|
self.criticals, not_found_expected_issues, expected_returncode = self.check_reported_issues(expected_issues)
|
|
expected_timeout = None
|
|
expected_signal = None
|
|
for i, f in enumerate(not_found_expected_issues):
|
|
returncode = f.get('returncode', [])
|
|
if not isinstance(returncode, list):
|
|
returncode = [returncode]
|
|
|
|
if f.get('signame'):
|
|
signames = f['signame']
|
|
if not isinstance(signames, list):
|
|
signames = [signames]
|
|
|
|
returncode = [EXITING_SIGNALS[signame] for signame in signames]
|
|
|
|
if returncode:
|
|
if 'sometimes' in f:
|
|
returncode.append(0)
|
|
expected_returncode = returncode
|
|
expected_signal = f
|
|
elif f.get("timeout"):
|
|
expected_timeout = f
|
|
|
|
not_found_expected_issues = [f for f in not_found_expected_issues
|
|
if not f.get('returncode') and not f.get('signame')]
|
|
|
|
msg = ""
|
|
result = Result.PASSED
|
|
if self.result == Result.TIMEOUT:
|
|
with open(self.logfile) as f:
|
|
signal_fault_info = self.fault_sig_regex.findall(f.read())
|
|
if signal_fault_info:
|
|
result = Result.FAILED
|
|
msg = signal_fault_info[0]
|
|
elif expected_timeout:
|
|
not_found_expected_issues.remove(expected_timeout)
|
|
result, msg = self.check_expected_timeout(expected_timeout)
|
|
else:
|
|
return
|
|
elif self.process.returncode in EXITING_SIGNALS:
|
|
msg = "Application exited with signal %s" % (
|
|
EXITING_SIGNALS[self.process.returncode])
|
|
if self.process.returncode not in expected_returncode:
|
|
result = Result.FAILED
|
|
else:
|
|
if expected_signal:
|
|
stack_msg, stack_res = self.check_expected_issue(
|
|
expected_signal)
|
|
if not stack_res:
|
|
msg += stack_msg
|
|
result = Result.FAILED
|
|
self.add_stack_trace_to_logfile()
|
|
elif self.process.returncode == VALGRIND_ERROR_CODE:
|
|
msg = "Valgrind reported errors "
|
|
result = Result.FAILED
|
|
elif self.process.returncode not in expected_returncode:
|
|
msg = "Application returned %s " % self.process.returncode
|
|
if expected_returncode != [0]:
|
|
msg += "(expected %s) " % expected_returncode
|
|
result = Result.FAILED
|
|
|
|
if self.criticals:
|
|
msg += "(critical errors: [%s]) " % ', '.join(set([c['summary']
|
|
for c in self.criticals]))
|
|
result = Result.FAILED
|
|
|
|
if not_found_expected_issues:
|
|
mandatory_failures = [f for f in not_found_expected_issues
|
|
if not f.get('sometimes', True)]
|
|
|
|
if mandatory_failures:
|
|
msg += " (Expected errors not found: %s) " % mandatory_failures
|
|
result = Result.FAILED
|
|
elif self.expected_issues:
|
|
msg += ' %s(Expected errors occurred: %s)%s' % (Colors.OKBLUE,
|
|
self.expected_issues,
|
|
Colors.ENDC)
|
|
result = Result.KNOWN_ERROR
|
|
|
|
if result == Result.PASSED:
|
|
for report in self.reports:
|
|
if report["level"] == "expected":
|
|
result = Result.KNOWN_ERROR
|
|
break
|
|
|
|
self.set_result(result, msg.strip())
|
|
|
|
def _generate_expected_issues(self):
|
|
res = ""
|
|
self.criticals = self.criticals or []
|
|
if self.result == Result.TIMEOUT:
|
|
res += """ {
|
|
'timeout': True,
|
|
'sometimes': True,
|
|
},"""
|
|
|
|
for report in self.criticals:
|
|
res += "\n%s{" % (" " * 12)
|
|
|
|
for key, value in report.items():
|
|
if key == "type":
|
|
continue
|
|
if value is None:
|
|
continue
|
|
res += '\n%s%s"%s": "%s",' % (
|
|
" " * 16, "# " if key == "details" else "",
|
|
key, value.replace('\n', '\\n'))
|
|
|
|
res += "\n%s}," % (" " * 12)
|
|
|
|
return res
|
|
|
|
def get_valgrind_suppressions(self):
|
|
result = super(GstValidateTest, self).get_valgrind_suppressions()
|
|
result.extend(utils.get_gst_build_valgrind_suppressions())
|
|
return result
|
|
|
|
|
|
class VariableFramerateMode(Enum):
|
|
DISABLED = 1
|
|
ENABLED = 2
|
|
AUTO = 3
|
|
|
|
|
|
class GstValidateEncodingTestInterface(object):
|
|
DURATION_TOLERANCE = GST_SECOND / 4
|
|
|
|
def __init__(self, combination, media_descriptor, duration_tolerance=None):
|
|
super(GstValidateEncodingTestInterface, self).__init__()
|
|
|
|
self.media_descriptor = media_descriptor
|
|
self.combination = combination
|
|
self.dest_file = ""
|
|
|
|
self._duration_tolerance = duration_tolerance
|
|
if duration_tolerance is None:
|
|
self._duration_tolerance = self.DURATION_TOLERANCE
|
|
|
|
def get_current_size(self):
|
|
try:
|
|
size = os.stat(urllib.parse.urlparse(self.dest_file).path).st_size
|
|
except OSError:
|
|
return None
|
|
|
|
self.debug("Size: %s" % size)
|
|
return size
|
|
|
|
def _get_profile_full(self, muxer, venc, aenc, video_restriction=None,
|
|
audio_restriction=None, audio_presence=0,
|
|
video_presence=0,
|
|
variable_framerate=VariableFramerateMode.DISABLED):
|
|
|
|
ret = ""
|
|
if muxer:
|
|
ret += muxer
|
|
ret += ":"
|
|
if venc:
|
|
if video_restriction is not None:
|
|
ret = ret + video_restriction + '->'
|
|
ret += venc
|
|
props = ""
|
|
if video_presence:
|
|
props += 'presence=%s|' % str(video_presence)
|
|
if variable_framerate == VariableFramerateMode.AUTO:
|
|
if video_restriction and "framerate" in video_restriction:
|
|
variable_framerate = VariableFramerateMode.DISABLED
|
|
else:
|
|
variable_framerate = VariableFramerateMode.ENABLED
|
|
if variable_framerate == VariableFramerateMode.ENABLED:
|
|
props += 'variable-framerate=true|'
|
|
if props:
|
|
ret = ret + '|' + props[:-1]
|
|
if aenc:
|
|
ret += ":"
|
|
if audio_restriction is not None:
|
|
ret = ret + audio_restriction + '->'
|
|
ret += aenc
|
|
if audio_presence:
|
|
ret = ret + '|' + str(audio_presence)
|
|
|
|
return ret.replace("::", ":")
|
|
|
|
def get_profile(self, video_restriction=None, audio_restriction=None,
|
|
variable_framerate=VariableFramerateMode.DISABLED):
|
|
vcaps = self.combination.get_video_caps()
|
|
acaps = self.combination.get_audio_caps()
|
|
if video_restriction is None:
|
|
video_restriction = self.combination.video_restriction
|
|
if audio_restriction is None:
|
|
audio_restriction = self.combination.audio_restriction
|
|
if self.media_descriptor is not None:
|
|
if self.combination.video == "theora":
|
|
# Theoraenc doesn't support variable framerate, make sure to avoid them
|
|
framerate = self.media_descriptor.get_framerate()
|
|
if framerate == Fraction(0, 1):
|
|
framerate = Fraction(30, 1)
|
|
restriction = utils.GstCaps.new_from_str(video_restriction or "video/x-raw")
|
|
for struct, _ in restriction:
|
|
if struct.get("framerate") is None:
|
|
struct.set("framerate", struct.FRACTION_TYPE, framerate)
|
|
video_restriction = str(restriction)
|
|
|
|
video_presence = self.media_descriptor.get_num_tracks("video")
|
|
if video_presence == 0:
|
|
vcaps = None
|
|
|
|
audio_presence = self.media_descriptor.get_num_tracks("audio")
|
|
if audio_presence == 0:
|
|
acaps = None
|
|
|
|
return self._get_profile_full(self.combination.get_muxer_caps(),
|
|
vcaps, acaps,
|
|
audio_presence=audio_presence,
|
|
video_presence=video_presence,
|
|
video_restriction=video_restriction,
|
|
audio_restriction=audio_restriction,
|
|
variable_framerate=variable_framerate)
|
|
|
|
def _clean_caps(self, caps):
|
|
"""
|
|
Returns a list of key=value or structure name, without "(types)" or ";" or ","
|
|
"""
|
|
return re.sub(r"\(.+?\)\s*| |;", '', caps).split(',')
|
|
|
|
# pylint: disable=E1101
|
|
def _has_caps_type_variant(self, c, ccaps):
|
|
"""
|
|
Handle situations where we can have application/ogg or video/ogg or
|
|
audio/ogg
|
|
"""
|
|
has_variant = False
|
|
media_type = re.findall("application/|video/|audio/", c)
|
|
if media_type:
|
|
media_type = media_type[0].replace('/', '')
|
|
possible_mtypes = ["application", "video", "audio"]
|
|
possible_mtypes.remove(media_type)
|
|
for tmptype in possible_mtypes:
|
|
possible_c_variant = c.replace(media_type, tmptype)
|
|
if possible_c_variant in ccaps:
|
|
self.info(
|
|
"Found %s in %s, good enough!", possible_c_variant, ccaps)
|
|
has_variant = True
|
|
|
|
return has_variant
|
|
|
|
# pylint: disable=E1101
|
|
def run_iqa_test(self, reference_file_uri):
|
|
"""
|
|
Runs IQA test if @reference_file_path exists
|
|
@test: The test to run tests on
|
|
"""
|
|
if not GstValidateBaseTestManager.has_feature('iqa'):
|
|
self.debug('Iqa element not present, not running extra test.')
|
|
return
|
|
|
|
pipeline_desc = """
|
|
uridecodebin uri=%s !
|
|
iqa name=iqa do-dssim=true dssim-error-threshold=1.0 ! fakesink
|
|
uridecodebin uri=%s ! iqa.
|
|
""" % (reference_file_uri, self.dest_file)
|
|
pipeline_desc = pipeline_desc.replace("\n", "")
|
|
|
|
command = [GstValidateBaseTestManager.COMMAND] + \
|
|
shlex.split(pipeline_desc)
|
|
msg = "## Running IQA tests on results of: " \
|
|
+ "%s\n### Command: \n```\n%s\n```\n" % (
|
|
self.classname, ' '.join(command))
|
|
if not self.options.redirect_logs:
|
|
self.out.write(msg)
|
|
self.out.flush()
|
|
else:
|
|
printc(msg, Colors.OKBLUE)
|
|
|
|
self.process = subprocess.Popen(command,
|
|
stderr=self.out,
|
|
stdout=self.out,
|
|
env=self.proc_env,
|
|
cwd=self.workdir)
|
|
self.process.wait()
|
|
|
|
def check_encoded_file(self):
|
|
result_descriptor = GstValidateMediaDescriptor.new_from_uri(
|
|
self.dest_file)
|
|
if result_descriptor is None:
|
|
return (Result.FAILED, "Could not discover encoded file %s"
|
|
% self.dest_file)
|
|
|
|
duration = result_descriptor.get_duration()
|
|
orig_duration = self.media_descriptor.get_duration()
|
|
tolerance = self._duration_tolerance
|
|
|
|
if orig_duration - tolerance >= duration <= orig_duration + tolerance:
|
|
os.remove(result_descriptor.get_path())
|
|
self.add_report(
|
|
{
|
|
'type': 'report',
|
|
'issue-id': 'transcoded-file-wrong-duration',
|
|
'summary': 'The duration of a transcoded file doesn\'t match the duration of the original file',
|
|
'level': 'critical',
|
|
'detected-on': 'pipeline',
|
|
'details': "Duration of encoded file is " " wrong (%s instead of %s)" % (
|
|
utils.TIME_ARGS(duration), utils.TIME_ARGS(orig_duration))
|
|
}
|
|
)
|
|
else:
|
|
all_tracks_caps = result_descriptor.get_tracks_caps()
|
|
container_caps = result_descriptor.get_caps()
|
|
if container_caps:
|
|
all_tracks_caps.insert(0, ("container", container_caps))
|
|
|
|
for track_type, caps in all_tracks_caps:
|
|
ccaps = self._clean_caps(caps)
|
|
wanted_caps = self.combination.get_caps(track_type)
|
|
cwanted_caps = self._clean_caps(wanted_caps)
|
|
|
|
if wanted_caps is None:
|
|
os.remove(result_descriptor.get_path())
|
|
self.add_report(
|
|
{
|
|
'type': 'report',
|
|
'issue-id': 'transcoded-file-wrong-stream-type',
|
|
'summary': 'Expected stream types during transcoding do not match expectations',
|
|
'level': 'critical',
|
|
'detected-on': 'pipeline',
|
|
'details': "Found a track of type %s in the encoded files"
|
|
" but none where wanted in the encoded profile: %s" % (
|
|
track_type, self.combination)
|
|
}
|
|
)
|
|
return
|
|
|
|
for c in cwanted_caps:
|
|
if c not in ccaps:
|
|
if not self._has_caps_type_variant(c, ccaps):
|
|
os.remove(result_descriptor.get_path())
|
|
self.add_report(
|
|
{
|
|
'type': 'report',
|
|
'issue-id': 'transcoded-file-wrong-caps',
|
|
'summary': 'Expected stream caps during transcoding do not match expectations',
|
|
'level': 'critical',
|
|
'detected-on': 'pipeline',
|
|
'details': "Field: %s (from %s) not in caps of the outputted file %s" % (
|
|
wanted_caps, c, ccaps)
|
|
}
|
|
)
|
|
return
|
|
|
|
os.remove(result_descriptor.get_path())
|
|
|
|
|
|
class TestsManager(Loggable):
|
|
|
|
""" A class responsible for managing tests. """
|
|
|
|
name = "base"
|
|
loading_testsuite = None
|
|
|
|
def __init__(self):
|
|
|
|
Loggable.__init__(self)
|
|
|
|
self.tests = []
|
|
self.unwanted_tests = []
|
|
self.options = None
|
|
self.args = None
|
|
self.reporter = None
|
|
self.wanted_tests_patterns = []
|
|
self.blacklisted_tests_patterns = []
|
|
self._generators = []
|
|
self.check_testslist = True
|
|
self.all_tests = None
|
|
self.expected_issues = {}
|
|
self.blacklisted_tests = []
|
|
|
|
def init(self):
|
|
return True
|
|
|
|
def list_tests(self):
|
|
return sorted(list(self.tests), key=lambda x: x.classname)
|
|
|
|
def find_tests(self, classname):
|
|
regex = re.compile(classname)
|
|
return [test for test in self.list_tests() if regex.findall(test.classname)]
|
|
|
|
def add_expected_issues(self, expected_issues):
|
|
for bugid, failure_def in list(expected_issues.items()):
|
|
tests_regexes = []
|
|
for test_name_regex in failure_def['tests']:
|
|
regex = re.compile(test_name_regex)
|
|
tests_regexes.append(regex)
|
|
for test in self.tests:
|
|
if regex.findall(test.classname):
|
|
max_retries = failure_def.get('allow_flakiness', failure_def.get('max_retries'))
|
|
if max_retries:
|
|
test.max_retries = int(max_retries)
|
|
self.debug(f"{test.classname} allow {test.max_retries}")
|
|
else:
|
|
for issue in failure_def['issues']:
|
|
issue['bug'] = bugid
|
|
test.expected_issues.extend(failure_def['issues'])
|
|
self.debug("%s added expected issues from %s" % (
|
|
test.classname, bugid))
|
|
failure_def['tests'] = tests_regexes
|
|
|
|
self.expected_issues.update(expected_issues)
|
|
|
|
def add_test(self, test):
|
|
if test.generator is None:
|
|
test.classname = self.loading_testsuite + '.' + test.classname
|
|
|
|
for bugid, failure_def in list(self.expected_issues.items()):
|
|
failure_def['bug'] = bugid
|
|
for regex in failure_def['tests']:
|
|
if regex.findall(test.classname):
|
|
max_retries = failure_def.get('allow_flakiness', failure_def.get('max_retries'))
|
|
if max_retries:
|
|
test.max_retries = int(max_retries)
|
|
self.debug(f"{test.classname} allow {test.max_retries} retries.")
|
|
else:
|
|
for issue in failure_def['issues']:
|
|
issue['bug'] = bugid
|
|
test.expected_issues.extend(failure_def['issues'])
|
|
self.debug("%s added expected issues from %s" % (
|
|
test.classname, bugid))
|
|
|
|
if self._is_test_wanted(test):
|
|
if test not in self.tests:
|
|
self.tests.append(test)
|
|
else:
|
|
if test not in self.tests:
|
|
self.unwanted_tests.append(test)
|
|
|
|
def get_tests(self):
|
|
return self.tests
|
|
|
|
def populate_testsuite(self):
|
|
pass
|
|
|
|
def add_generators(self, generators):
|
|
"""
|
|
@generators: A list of, or one single #TestsGenerator to be used to generate tests
|
|
"""
|
|
if not isinstance(generators, list):
|
|
generators = [generators]
|
|
self._generators.extend(generators)
|
|
for generator in generators:
|
|
generator.testsuite = self.loading_testsuite
|
|
|
|
self._generators = list(set(self._generators))
|
|
|
|
def get_generators(self):
|
|
return self._generators
|
|
|
|
def _add_blacklist(self, blacklisted_tests):
|
|
if not isinstance(blacklisted_tests, list):
|
|
blacklisted_tests = [blacklisted_tests]
|
|
|
|
for patterns in blacklisted_tests:
|
|
for pattern in patterns.split(","):
|
|
self.blacklisted_tests_patterns.append(re.compile(pattern))
|
|
|
|
def set_default_blacklist(self, default_blacklist):
|
|
for test_regex, reason in default_blacklist:
|
|
if not test_regex.startswith(self.loading_testsuite + '.'):
|
|
test_regex = self.loading_testsuite + '.' + test_regex
|
|
self.blacklisted_tests.append((test_regex, reason))
|
|
self._add_blacklist(test_regex)
|
|
|
|
def add_options(self, parser):
|
|
""" Add more arguments. """
|
|
pass
|
|
|
|
def set_settings(self, options, args, reporter):
|
|
""" Set properties after options parsing. """
|
|
self.options = options
|
|
self.args = args
|
|
self.reporter = reporter
|
|
|
|
self.populate_testsuite()
|
|
|
|
if self.options.valgrind:
|
|
self.print_valgrind_bugs()
|
|
|
|
if options.wanted_tests:
|
|
for patterns in options.wanted_tests:
|
|
for pattern in patterns.split(","):
|
|
self.wanted_tests_patterns.append(re.compile(pattern))
|
|
|
|
if options.blacklisted_tests:
|
|
for patterns in options.blacklisted_tests:
|
|
self._add_blacklist(patterns)
|
|
|
|
def check_blacklists(self):
|
|
if self.options.check_bugs_status:
|
|
if not check_bugs_resolution(self.blacklisted_tests):
|
|
return False
|
|
|
|
return True
|
|
|
|
def log_blacklists(self):
|
|
if self.blacklisted_tests:
|
|
self.info("Currently 'hardcoded' %s blacklisted tests:" %
|
|
self.name)
|
|
|
|
for name, bug in self.blacklisted_tests:
|
|
if not self.options.check_bugs_status:
|
|
self.info(" + %s --> bug: %s" % (name, bug))
|
|
|
|
def check_expected_issues(self):
|
|
if not self.expected_issues or not self.options.check_bugs_status:
|
|
return True
|
|
|
|
bugs_definitions = defaultdict(list)
|
|
for bug, failure_def in list(self.expected_issues.items()):
|
|
tests_names = '|'.join(
|
|
[regex.pattern for regex in failure_def['tests']])
|
|
bugs_definitions[tests_names].extend([bug])
|
|
|
|
return check_bugs_resolution(bugs_definitions.items())
|
|
|
|
def _check_blacklisted(self, test):
|
|
for pattern in self.blacklisted_tests_patterns:
|
|
if pattern.findall(test.classname):
|
|
self.info("%s is blacklisted by %s", test.classname, pattern)
|
|
return True
|
|
|
|
return False
|
|
|
|
def _check_whitelisted(self, test):
|
|
for pattern in self.wanted_tests_patterns:
|
|
if pattern.findall(test.classname):
|
|
if self._check_blacklisted(test):
|
|
# If explicitly white listed that specific test
|
|
# bypass the blacklisting
|
|
if pattern.pattern != test.classname:
|
|
return False
|
|
return True
|
|
return False
|
|
|
|
def _check_duration(self, test):
|
|
if test.duration > 0 and int(self.options.long_limit) < int(test.duration):
|
|
self.info("Not activating %s as its duration (%d) is superior"
|
|
" than the long limit (%d)" % (test, test.duration,
|
|
int(self.options.long_limit)))
|
|
return False
|
|
|
|
return True
|
|
|
|
def _is_test_wanted(self, test):
|
|
if self._check_whitelisted(test):
|
|
if not self._check_duration(test):
|
|
return False
|
|
return True
|
|
|
|
if self._check_blacklisted(test):
|
|
return False
|
|
|
|
if not self._check_duration(test):
|
|
return False
|
|
|
|
if not self.wanted_tests_patterns:
|
|
return True
|
|
|
|
return False
|
|
|
|
def needs_http_server(self):
|
|
return False
|
|
|
|
def print_valgrind_bugs(self):
|
|
pass
|
|
|
|
|
|
class TestsGenerator(Loggable):
|
|
|
|
def __init__(self, name, test_manager, tests=[]):
|
|
Loggable.__init__(self)
|
|
self.name = name
|
|
self.test_manager = test_manager
|
|
self.testsuite = None
|
|
self._tests = {}
|
|
for test in tests:
|
|
self._tests[test.classname] = test
|
|
|
|
def generate_tests(self, *kwargs):
|
|
"""
|
|
Method that generates tests
|
|
"""
|
|
return list(self._tests.values())
|
|
|
|
def add_test(self, test):
|
|
test.generator = self
|
|
test.classname = self.testsuite + '.' + test.classname
|
|
self._tests[test.classname] = test
|
|
|
|
|
|
class GstValidateTestsGenerator(TestsGenerator):
|
|
|
|
def populate_tests(self, uri_minfo_special_scenarios, scenarios):
|
|
pass
|
|
|
|
def generate_tests(self, uri_minfo_special_scenarios, scenarios):
|
|
self.populate_tests(uri_minfo_special_scenarios, scenarios)
|
|
return super(GstValidateTestsGenerator, self).generate_tests()
|
|
|
|
|
|
class _TestsLauncher(Loggable):
|
|
|
|
def __init__(self):
|
|
|
|
Loggable.__init__(self)
|
|
|
|
self.options = None
|
|
self.testers = []
|
|
self.tests = []
|
|
self.reporter = None
|
|
self._list_testers()
|
|
self.all_tests = None
|
|
self.wanted_tests_patterns = []
|
|
|
|
self.queue = queue.Queue()
|
|
self.jobs = []
|
|
self.total_num_tests = 0
|
|
self.current_progress = -1
|
|
self.server = None
|
|
self.httpsrv = None
|
|
self.vfb_server = None
|
|
|
|
def _list_app_dirs(self):
|
|
app_dirs = []
|
|
env_dirs = os.environ["GST_VALIDATE_APPS_DIR"]
|
|
if env_dirs is not None:
|
|
for dir_ in env_dirs.split(os.pathsep):
|
|
app_dirs.append(dir_)
|
|
|
|
return app_dirs
|
|
|
|
def _exec_app(self, app_dir, env):
|
|
try:
|
|
files = os.listdir(app_dir)
|
|
except OSError as e:
|
|
self.debug("Could not list %s: %s" % (app_dir, e))
|
|
files = []
|
|
for f in files:
|
|
if f.endswith(".py"):
|
|
exec(compile(open(os.path.join(app_dir, f)).read(),
|
|
os.path.join(app_dir, f), 'exec'), env)
|
|
|
|
def _exec_apps(self, env):
|
|
app_dirs = self._list_app_dirs()
|
|
for app_dir in app_dirs:
|
|
self._exec_app(app_dir, env)
|
|
|
|
def _list_testers(self):
|
|
env = globals().copy()
|
|
self._exec_apps(env)
|
|
|
|
testers = [i() for i in utils.get_subclasses(TestsManager, env)]
|
|
for tester in testers:
|
|
if tester.init() is True:
|
|
self.testers.append(tester)
|
|
else:
|
|
self.warning("Can not init tester: %s -- PATH is %s"
|
|
% (tester.name, os.environ["PATH"]))
|
|
|
|
def add_options(self, parser):
|
|
for tester in self.testers:
|
|
tester.add_options(parser)
|
|
|
|
def _load_testsuite(self, testsuites):
|
|
exceptions = []
|
|
for testsuite in testsuites:
|
|
try:
|
|
sys.path.insert(0, os.path.dirname(testsuite))
|
|
spec = importlib.util.spec_from_file_location(os.path.basename(testsuite).replace(".py", ""), testsuite)
|
|
module = importlib.util.module_from_spec(spec)
|
|
spec.loader.exec_module(module)
|
|
return (module, None)
|
|
except Exception as e:
|
|
exceptions.append("Could not load %s: %s" % (testsuite, e))
|
|
continue
|
|
finally:
|
|
sys.path.remove(os.path.dirname(testsuite))
|
|
|
|
return (None, exceptions)
|
|
|
|
def _load_testsuites(self):
|
|
testsuites = {}
|
|
for testsuite in self.options.testsuites:
|
|
if testsuite.endswith('.py') and os.path.exists(testsuite):
|
|
testsuite = os.path.abspath(os.path.expanduser(testsuite))
|
|
loaded_module = self._load_testsuite([testsuite])
|
|
else:
|
|
possible_testsuites_paths = [os.path.join(d, testsuite + ".py")
|
|
for d in self.options.testsuites_dirs]
|
|
loaded_module = self._load_testsuite(possible_testsuites_paths)
|
|
|
|
module = loaded_module[0]
|
|
if not loaded_module[0]:
|
|
if "." in testsuite:
|
|
self.options.testsuites.append(testsuite.split('.')[0])
|
|
self.info("%s looks like a test name, trying that" %
|
|
testsuite)
|
|
self.options.wanted_tests.append(testsuite)
|
|
else:
|
|
if testsuite in testsuites:
|
|
self.info('Testuite %s was loaded previously', testsuite)
|
|
continue
|
|
printc("Could not load testsuite: %s, reasons: %s" % (
|
|
testsuite, loaded_module[1]), Colors.FAIL)
|
|
continue
|
|
|
|
if module.__name__ in testsuites:
|
|
self.info("Trying to load testsuite '%s' a second time?", module.__name__)
|
|
continue
|
|
|
|
testsuites[module.__name__] = module
|
|
if not hasattr(module, "TEST_MANAGER"):
|
|
module.TEST_MANAGER = [tester.name for tester in self.testers]
|
|
elif not isinstance(module.TEST_MANAGER, list):
|
|
module.TEST_MANAGER = [module.TEST_MANAGER]
|
|
|
|
self.options.testsuites = list(testsuites.values())
|
|
|
|
def _setup_testsuites(self):
|
|
for testsuite in self.options.testsuites:
|
|
loaded = False
|
|
wanted_test_manager = None
|
|
# TEST_MANAGER has been set in _load_testsuites()
|
|
assert hasattr(testsuite, "TEST_MANAGER")
|
|
wanted_test_manager = testsuite.TEST_MANAGER
|
|
if not isinstance(wanted_test_manager, list):
|
|
wanted_test_manager = [wanted_test_manager]
|
|
|
|
for tester in self.testers:
|
|
if wanted_test_manager is not None and \
|
|
tester.name not in wanted_test_manager:
|
|
continue
|
|
|
|
prev_testsuite_name = TestsManager.loading_testsuite
|
|
if self.options.user_paths:
|
|
TestsManager.loading_testsuite = tester.name
|
|
tester.register_defaults()
|
|
loaded = True
|
|
else:
|
|
TestsManager.loading_testsuite = testsuite.__name__
|
|
if testsuite.setup_tests(tester, self.options):
|
|
loaded = True
|
|
if prev_testsuite_name:
|
|
TestsManager.loading_testsuite = prev_testsuite_name
|
|
|
|
if not loaded:
|
|
printc("Could not load testsuite: %s"
|
|
" maybe because of missing TestManager"
|
|
% (testsuite), Colors.FAIL)
|
|
return False
|
|
|
|
def _load_config(self, options):
|
|
printc("Loading config files is DEPRECATED"
|
|
" you should use the new testsuite format now",)
|
|
|
|
for tester in self.testers:
|
|
tester.options = options
|
|
globals()[tester.name] = tester
|
|
globals()["options"] = options
|
|
c__file__ = __file__
|
|
globals()["__file__"] = self.options.config
|
|
exec(compile(open(self.options.config).read(),
|
|
self.options.config, 'exec'), globals())
|
|
globals()["__file__"] = c__file__
|
|
|
|
def set_settings(self, options, args):
|
|
if options.xunit_file:
|
|
self.reporter = reporters.XunitReporter(options)
|
|
else:
|
|
self.reporter = reporters.Reporter(options)
|
|
|
|
self.options = options
|
|
wanted_testers = None
|
|
for tester in self.testers:
|
|
if tester.name in args:
|
|
wanted_testers = tester.name
|
|
|
|
if wanted_testers:
|
|
testers = self.testers
|
|
self.testers = []
|
|
for tester in testers:
|
|
if tester.name in args:
|
|
self.testers.append(tester)
|
|
args.remove(tester.name)
|
|
|
|
if options.config:
|
|
self._load_config(options)
|
|
|
|
self._load_testsuites()
|
|
if not self.options.testsuites:
|
|
printc("Not testsuite loaded!", Colors.FAIL)
|
|
return False
|
|
|
|
for tester in self.testers:
|
|
tester.set_settings(options, args, self.reporter)
|
|
|
|
if not options.config and options.testsuites:
|
|
if self._setup_testsuites() is False:
|
|
return False
|
|
|
|
if self.options.check_bugs_status:
|
|
printc("-> Checking bugs resolution... ", end='')
|
|
|
|
for tester in self.testers:
|
|
if not tester.check_blacklists():
|
|
return False
|
|
|
|
tester.log_blacklists()
|
|
|
|
if not tester.check_expected_issues():
|
|
return False
|
|
|
|
if self.options.check_bugs_status:
|
|
printc("OK", Colors.OKGREEN)
|
|
|
|
if self.needs_http_server() or options.httponly is True:
|
|
self.httpsrv = HTTPServer(options)
|
|
self.httpsrv.start()
|
|
|
|
if options.no_display:
|
|
self.vfb_server = get_virual_frame_buffer_server(options)
|
|
res = self.vfb_server.start()
|
|
if res[0] is False:
|
|
printc("Could not start virtual frame server: %s" % res[1],
|
|
Colors.FAIL)
|
|
return False
|
|
os.environ["DISPLAY"] = self.vfb_server.display_id
|
|
|
|
return True
|
|
|
|
def _check_tester_has_other_testsuite(self, testsuite, tester):
|
|
if tester.name != testsuite.TEST_MANAGER[0]:
|
|
return True
|
|
|
|
for t in self.options.testsuites:
|
|
if t != testsuite:
|
|
for other_testmanager in t.TEST_MANAGER:
|
|
if other_testmanager == tester.name:
|
|
return True
|
|
|
|
return False
|
|
|
|
def _check_defined_tests(self, tester, tests):
|
|
if self.options.blacklisted_tests or self.options.wanted_tests:
|
|
return
|
|
|
|
tests_names = [test.classname for test in tests]
|
|
testlist_changed = False
|
|
for testsuite in self.options.testsuites:
|
|
if not self._check_tester_has_other_testsuite(testsuite, tester) \
|
|
and tester.check_testslist:
|
|
try:
|
|
testlist_file = open(os.path.splitext(testsuite.__file__)[0] + ".testslist",
|
|
'r+')
|
|
|
|
know_tests = testlist_file.read().split("\n")
|
|
testlist_file.close()
|
|
|
|
testlist_file = open(os.path.splitext(testsuite.__file__)[0] + ".testslist",
|
|
'w')
|
|
except IOError:
|
|
continue
|
|
|
|
optional_out = []
|
|
for test in know_tests:
|
|
if test and test.strip('~') not in tests_names:
|
|
if not test.startswith('~'):
|
|
testlist_changed = True
|
|
printc("Test %s Not in testsuite %s anymore"
|
|
% (test, testsuite.__file__), Colors.FAIL)
|
|
else:
|
|
optional_out.append((test, None))
|
|
|
|
tests_names = sorted([(test.classname, test) for test in tests] + optional_out,
|
|
key=lambda x: x[0].strip('~'))
|
|
|
|
for tname, test in tests_names:
|
|
if test and test.optional:
|
|
tname = '~' + tname
|
|
testlist_file.write("%s\n" % (tname))
|
|
if tname and tname not in know_tests:
|
|
printc("Test %s is NEW in testsuite %s"
|
|
% (tname, testsuite.__file__),
|
|
Colors.FAIL if self.options.fail_on_testlist_change else Colors.OKGREEN)
|
|
testlist_changed = True
|
|
|
|
testlist_file.close()
|
|
break
|
|
|
|
return testlist_changed
|
|
|
|
def _split_tests(self, num_groups):
|
|
groups = [[] for x in range(num_groups)]
|
|
group = cycle(groups)
|
|
for test in self.tests:
|
|
next(group).append(test)
|
|
return groups
|
|
|
|
def list_tests(self):
|
|
for tester in self.testers:
|
|
if not self._tester_needed(tester):
|
|
continue
|
|
|
|
tests = tester.list_tests()
|
|
if self._check_defined_tests(tester, tests) and \
|
|
self.options.fail_on_testlist_change:
|
|
raise RuntimeError("Unexpected new test in testsuite.")
|
|
|
|
self.tests.extend(tests)
|
|
self.tests.sort(key=lambda test: test.classname)
|
|
|
|
if self.options.num_parts < 1:
|
|
raise RuntimeError("Tests must be split in positive number of parts.")
|
|
if self.options.num_parts > len(self.tests):
|
|
raise RuntimeError("Cannot have more parts then there exist tests.")
|
|
if self.options.part_index < 1 or self.options.part_index > self.options.num_parts:
|
|
raise RuntimeError("Part index is out of range")
|
|
|
|
self.tests = self._split_tests(self.options.num_parts)[self.options.part_index - 1]
|
|
return self.tests
|
|
|
|
def _tester_needed(self, tester):
|
|
for testsuite in self.options.testsuites:
|
|
if tester.name in testsuite.TEST_MANAGER:
|
|
return True
|
|
return False
|
|
|
|
def server_wrapper(self, ready):
|
|
self.server = GstValidateTCPServer(
|
|
('localhost', 0), GstValidateListener)
|
|
self.server.socket.settimeout(None)
|
|
self.server.launcher = self
|
|
self.serverport = self.server.socket.getsockname()[1]
|
|
self.info("%s server port: %s" % (self, self.serverport))
|
|
ready.set()
|
|
|
|
self.server.serve_forever(poll_interval=0.05)
|
|
|
|
def _start_server(self):
|
|
self.info("Starting TCP Server")
|
|
ready = threading.Event()
|
|
self.server_thread = threading.Thread(target=self.server_wrapper,
|
|
kwargs={'ready': ready})
|
|
self.server_thread.start()
|
|
ready.wait()
|
|
os.environ["GST_VALIDATE_SERVER"] = "tcp://localhost:%s" % self.serverport
|
|
|
|
def _stop_server(self):
|
|
if self.server:
|
|
self.server.shutdown()
|
|
self.server_thread.join()
|
|
self.server.server_close()
|
|
self.server = None
|
|
|
|
def test_wait(self):
|
|
while True:
|
|
# Check process every second for timeout
|
|
try:
|
|
self.queue.get(timeout=1)
|
|
except queue.Empty:
|
|
pass
|
|
|
|
for test in self.jobs:
|
|
if test.process_update():
|
|
self.jobs.remove(test)
|
|
return test
|
|
|
|
def tests_wait(self):
|
|
try:
|
|
test = self.test_wait()
|
|
test.check_results()
|
|
except KeyboardInterrupt:
|
|
for test in self.jobs:
|
|
test.kill_subprocess()
|
|
raise
|
|
|
|
return test
|
|
|
|
def start_new_job(self, tests_left):
|
|
try:
|
|
test = tests_left.pop(0)
|
|
except IndexError:
|
|
return False
|
|
|
|
test.test_start(self.queue)
|
|
|
|
self.jobs.append(test)
|
|
|
|
return True
|
|
|
|
def print_result(self, current_test_num, test, total_num_tests, retry_on_failures=False):
|
|
if test.result not in [Result.PASSED, Result.KNOWN_ERROR] and (not retry_on_failures or test.max_retries):
|
|
printc(str(test), color=utils.get_color_for_result(test.result))
|
|
|
|
length = 80
|
|
progress = int(length * current_test_num // total_num_tests)
|
|
bar = '█' * progress + '-' * (length - progress)
|
|
if is_tty():
|
|
printc('\r|%s| [%s/%s]' % (bar, current_test_num, total_num_tests), end='\r')
|
|
else:
|
|
if progress > self.current_progress:
|
|
self.current_progress = progress
|
|
printc('|%s| [%s/%s]' % (bar, current_test_num, total_num_tests))
|
|
|
|
def _run_tests(self, running_tests=None, all_alone=False, retry_on_failures=False, total_num_tests=None):
|
|
if not self.all_tests:
|
|
self.all_tests = self.list_tests()
|
|
|
|
if not running_tests:
|
|
running_tests = self.tests
|
|
|
|
self.reporter.init_timer()
|
|
alone_tests = []
|
|
tests = []
|
|
for test in running_tests:
|
|
if test.is_parallel and not all_alone:
|
|
tests.append(test)
|
|
else:
|
|
alone_tests.append(test)
|
|
|
|
# use max to defend against the case where all tests are alone_tests
|
|
max_num_jobs = max(min(self.options.num_jobs, len(tests)), 1)
|
|
jobs_running = 0
|
|
|
|
if self.options.forever and len(tests) < self.options.num_jobs and len(tests):
|
|
max_num_jobs = self.options.num_jobs
|
|
copied = []
|
|
i = 0
|
|
while (len(tests) + len(copied)) < max_num_jobs:
|
|
copied.append(tests[i].copy(len(copied) + 1))
|
|
|
|
i += 1
|
|
if i >= len(tests):
|
|
i = 0
|
|
tests += copied
|
|
self.tests += copied
|
|
|
|
self.total_num_tests = len(self.all_tests)
|
|
prefix = "=> Re-r" if total_num_tests else "R"
|
|
total_num_tests = total_num_tests if total_num_tests else self.total_num_tests
|
|
printc(f"\n{prefix}unning {total_num_tests} tests...", color=Colors.HEADER)
|
|
# if order of test execution doesn't matter, shuffle
|
|
# the order to optimize cpu usage
|
|
if self.options.shuffle:
|
|
random.shuffle(tests)
|
|
random.shuffle(alone_tests)
|
|
|
|
current_test_num = 1
|
|
to_retry = []
|
|
for num_jobs, tests in [(max_num_jobs, tests), (1, alone_tests)]:
|
|
tests_left = list(tests)
|
|
for i in range(num_jobs):
|
|
if not self.start_new_job(tests_left):
|
|
break
|
|
jobs_running += 1
|
|
|
|
while jobs_running != 0:
|
|
test = self.tests_wait()
|
|
jobs_running -= 1
|
|
current_test_num += 1
|
|
res = test.test_end(retry_on_failures=retry_on_failures)
|
|
to_report = True
|
|
if res not in [Result.PASSED, Result.SKIPPED, Result.KNOWN_ERROR]:
|
|
if self.options.forever or self.options.fatal_error:
|
|
self.print_result(current_test_num - 1, test, retry_on_failures=retry_on_failures,
|
|
total_num_tests=total_num_tests)
|
|
self.reporter.after_test(test)
|
|
return False
|
|
|
|
if retry_on_failures or test.max_retries and not self.options.no_retry_on_failures:
|
|
if not self.options.redirect_logs:
|
|
test.copy_logfiles()
|
|
to_retry.append(test)
|
|
|
|
# Not adding to final report if flakiness is tolerated
|
|
if test.max_retries:
|
|
test.max_retries -= 1
|
|
to_report = False
|
|
self.print_result(current_test_num - 1, test,
|
|
retry_on_failures=retry_on_failures,
|
|
total_num_tests=total_num_tests)
|
|
if to_report:
|
|
self.reporter.after_test(test)
|
|
if self.start_new_job(tests_left):
|
|
jobs_running += 1
|
|
|
|
if to_retry:
|
|
printc("--> Rerunning the following tests to see if they are flaky:", Colors.WARNING)
|
|
for test in to_retry:
|
|
test.clean()
|
|
printc(f' * {test.classname}')
|
|
printc('')
|
|
self.current_progress = -1
|
|
res = self._run_tests(
|
|
to_retry,
|
|
all_alone=True,
|
|
retry_on_failures=False,
|
|
total_num_tests=len(to_retry),
|
|
)
|
|
|
|
return res
|
|
|
|
return True
|
|
|
|
def clean_tests(self, stop_server=False):
|
|
for test in self.tests:
|
|
test.clean()
|
|
if stop_server:
|
|
self._stop_server()
|
|
|
|
def run_tests(self):
|
|
r = 0
|
|
try:
|
|
self._start_server()
|
|
if self.options.forever:
|
|
r = 1
|
|
while True:
|
|
self.current_progress = -1
|
|
printc("-> Iteration %d" % r, end='\r')
|
|
|
|
if not self._run_tests():
|
|
break
|
|
r += 1
|
|
self.clean_tests()
|
|
msg = "-> Iteration %d... %sOK%s" % (r, Colors.OKGREEN, Colors.ENDC)
|
|
printc(msg, end="\r")
|
|
|
|
return False
|
|
elif self.options.n_runs:
|
|
res = True
|
|
for r in range(self.options.n_runs):
|
|
self.current_progress = -1
|
|
printc("-> Iteration %d" % r, end='\r')
|
|
if not self._run_tests(retry_on_failures=self.options.retry_on_failures):
|
|
res = False
|
|
printc("ERROR", Colors.FAIL, end="\r")
|
|
else:
|
|
printc("OK", Colors.OKGREEN, end="\r")
|
|
self.clean_tests()
|
|
|
|
return res
|
|
else:
|
|
return self._run_tests(retry_on_failures=self.options.retry_on_failures)
|
|
finally:
|
|
if self.options.forever:
|
|
printc("\n-> Ran %d times" % r)
|
|
if self.httpsrv:
|
|
self.httpsrv.stop()
|
|
if self.vfb_server:
|
|
self.vfb_server.stop()
|
|
self.clean_tests(True)
|
|
|
|
def final_report(self):
|
|
return self.reporter.final_report()
|
|
|
|
def needs_http_server(self):
|
|
for tester in self.testers:
|
|
if tester.needs_http_server():
|
|
return True
|
|
|
|
|
|
class NamedDic(object):
|
|
|
|
def __init__(self, props):
|
|
if props:
|
|
for name, value in props.items():
|
|
setattr(self, name, value)
|
|
|
|
|
|
class Scenario(object):
|
|
|
|
def __init__(self, name, props, path=None):
|
|
self.name = name
|
|
self.path = path
|
|
|
|
for prop, value in props:
|
|
setattr(self, prop.replace("-", "_"), value)
|
|
|
|
def get_execution_name(self):
|
|
if self.path is not None:
|
|
return self.path
|
|
else:
|
|
return self.name
|
|
|
|
def seeks(self):
|
|
if hasattr(self, "seek"):
|
|
return bool(self.seek)
|
|
|
|
return False
|
|
|
|
def needs_clock_sync(self):
|
|
if hasattr(self, "need_clock_sync"):
|
|
return bool(self.need_clock_sync)
|
|
|
|
return False
|
|
|
|
def needs_live_content(self):
|
|
# Scenarios that can only be used on live content
|
|
if hasattr(self, "live_content_required"):
|
|
return bool(self.live_content_required)
|
|
return False
|
|
|
|
def compatible_with_live_content(self):
|
|
# if a live content is required it's implicitly compatible with
|
|
# live content
|
|
if self.needs_live_content():
|
|
return True
|
|
if hasattr(self, "live_content_compatible"):
|
|
return bool(self.live_content_compatible)
|
|
return False
|
|
|
|
def get_min_media_duration(self):
|
|
if hasattr(self, "min_media_duration"):
|
|
return float(self.min_media_duration)
|
|
|
|
return 0
|
|
|
|
def does_reverse_playback(self):
|
|
if hasattr(self, "reverse_playback"):
|
|
return bool(self.reverse_playback)
|
|
|
|
return False
|
|
|
|
def get_duration(self):
|
|
try:
|
|
return float(getattr(self, "duration"))
|
|
except AttributeError:
|
|
return 0
|
|
|
|
def get_min_tracks(self, track_type):
|
|
try:
|
|
return int(getattr(self, "min_%s_track" % track_type))
|
|
except AttributeError:
|
|
return 0
|
|
|
|
def __repr__(self):
|
|
return "<Scenario %s>" % self.name
|
|
|
|
|
|
class ScenarioManager(Loggable):
|
|
_instance = None
|
|
system_scenarios = []
|
|
special_scenarios = {}
|
|
|
|
FILE_EXTENSION = "scenario"
|
|
|
|
def __new__(cls, *args, **kwargs):
|
|
if not cls._instance:
|
|
cls._instance = super(ScenarioManager, cls).__new__(
|
|
cls, *args, **kwargs)
|
|
cls._instance.config = None
|
|
cls._instance.discovered = False
|
|
Loggable.__init__(cls._instance)
|
|
|
|
return cls._instance
|
|
|
|
def find_special_scenarios(self, mfile):
|
|
scenarios = []
|
|
mfile_bname = os.path.basename(mfile)
|
|
|
|
for f in os.listdir(os.path.dirname(mfile)):
|
|
if re.findall("%s\..*\.%s$" % (re.escape(mfile_bname), self.FILE_EXTENSION), f):
|
|
scenarios.append(os.path.join(os.path.dirname(mfile), f))
|
|
|
|
if scenarios:
|
|
scenarios = self.discover_scenarios(scenarios, mfile)
|
|
|
|
return scenarios
|
|
|
|
def discover_scenarios(self, scenario_paths=[], mfile=None):
|
|
"""
|
|
Discover scenarios specified in scenario_paths or the default ones
|
|
if nothing specified there
|
|
"""
|
|
scenarios = []
|
|
scenario_defs = os.path.join(self.config.main_dir, "scenarios.def")
|
|
log_path = os.path.join(self.config.logsdir, "scenarios_discovery.log")
|
|
logs = open(log_path, 'w')
|
|
|
|
try:
|
|
command = [GstValidateBaseTestManager.COMMAND,
|
|
"--scenarios-defs-output-file", scenario_defs]
|
|
command.extend(scenario_paths)
|
|
subprocess.check_call(command, stdout=logs, stderr=logs)
|
|
except subprocess.CalledProcessError as e:
|
|
self.error(e)
|
|
self.error('See %s' % log_path)
|
|
pass
|
|
|
|
config = configparser.RawConfigParser()
|
|
f = open(scenario_defs)
|
|
config.readfp(f)
|
|
|
|
for section in config.sections():
|
|
name = None
|
|
if scenario_paths:
|
|
for scenario_path in scenario_paths:
|
|
if section == scenario_path:
|
|
if mfile is None:
|
|
name = os.path.basename(section).replace("." + self.FILE_EXTENSION, "")
|
|
path = scenario_path
|
|
else:
|
|
# The real name of the scenario is:
|
|
# filename.REALNAME.scenario
|
|
name = scenario_path.replace(mfile + ".", "").replace(
|
|
"." + self.FILE_EXTENSION, "")
|
|
path = scenario_path
|
|
break
|
|
else:
|
|
name = os.path.basename(section).replace("." + self.FILE_EXTENSION, "")
|
|
path = None
|
|
|
|
assert name
|
|
|
|
props = config.items(section)
|
|
scenario = Scenario(name, props, path)
|
|
if scenario_paths:
|
|
self.special_scenarios[path] = scenario
|
|
scenarios.append(scenario)
|
|
|
|
if not scenario_paths:
|
|
self.discovered = True
|
|
self.system_scenarios.extend(scenarios)
|
|
|
|
return scenarios
|
|
|
|
def get_scenario(self, name):
|
|
if name is not None and os.path.isabs(name) and name.endswith(self.FILE_EXTENSION):
|
|
scenario = self.special_scenarios.get(name)
|
|
if scenario:
|
|
return scenario
|
|
|
|
scenarios = self.discover_scenarios([name])
|
|
self.special_scenarios[name] = scenarios
|
|
|
|
if scenarios:
|
|
return scenarios[0]
|
|
|
|
if self.discovered is False:
|
|
self.discover_scenarios()
|
|
|
|
if name is None:
|
|
return self.system_scenarios
|
|
|
|
try:
|
|
return [scenario for scenario in self.system_scenarios if scenario.name == name][0]
|
|
except IndexError:
|
|
self.warning("Scenario: %s not found" % name)
|
|
return None
|
|
|
|
|
|
class GstValidateBaseTestManager(TestsManager):
|
|
scenarios_manager = ScenarioManager()
|
|
features_cache = {}
|
|
|
|
def __init__(self):
|
|
super(GstValidateBaseTestManager, self).__init__()
|
|
self._scenarios = []
|
|
self._encoding_formats = []
|
|
|
|
@classmethod
|
|
def update_commands(cls, extra_paths=None):
|
|
for varname, cmd in {'': 'gst-validate',
|
|
'TRANSCODING_': 'gst-validate-transcoding',
|
|
'MEDIA_CHECK_': 'gst-validate-media-check',
|
|
'RTSP_SERVER_': 'gst-validate-rtsp-server',
|
|
'INSPECT_': 'gst-inspect'}.items():
|
|
setattr(cls, varname + 'COMMAND', which(cmd + '-1.0', extra_paths))
|
|
|
|
@classmethod
|
|
def has_feature(cls, featurename):
|
|
try:
|
|
return cls.features_cache[featurename]
|
|
except KeyError:
|
|
pass
|
|
|
|
try:
|
|
subprocess.check_output([cls.INSPECT_COMMAND, featurename])
|
|
res = True
|
|
except subprocess.CalledProcessError:
|
|
res = False
|
|
|
|
cls.features_cache[featurename] = res
|
|
return res
|
|
|
|
def add_scenarios(self, scenarios):
|
|
"""
|
|
@scenarios A list or a unic scenario name(s) to be run on the tests.
|
|
They are just the default scenarios, and then depending on
|
|
the TestsGenerator to be used you can have more fine grained
|
|
control on what to be run on each series of tests.
|
|
"""
|
|
if isinstance(scenarios, list):
|
|
self._scenarios.extend(scenarios)
|
|
else:
|
|
self._scenarios.append(scenarios)
|
|
|
|
self._scenarios = list(set(self._scenarios))
|
|
|
|
def set_scenarios(self, scenarios):
|
|
"""
|
|
Override the scenarios
|
|
"""
|
|
self._scenarios = []
|
|
self.add_scenarios(scenarios)
|
|
|
|
def get_scenarios(self):
|
|
return self._scenarios
|
|
|
|
def add_encoding_formats(self, encoding_formats):
|
|
"""
|
|
:param encoding_formats: A list or one single #MediaFormatCombinations describing wanted output
|
|
formats for transcoding test.
|
|
They are just the default encoding formats, and then depending on
|
|
the TestsGenerator to be used you can have more fine grained
|
|
control on what to be run on each series of tests.
|
|
"""
|
|
if isinstance(encoding_formats, list):
|
|
self._encoding_formats.extend(encoding_formats)
|
|
else:
|
|
self._encoding_formats.append(encoding_formats)
|
|
|
|
self._encoding_formats = list(set(self._encoding_formats))
|
|
|
|
def get_encoding_formats(self):
|
|
return self._encoding_formats
|
|
|
|
|
|
GstValidateBaseTestManager.update_commands()
|
|
|
|
|
|
class MediaDescriptor(Loggable):
|
|
|
|
def __init__(self):
|
|
Loggable.__init__(self)
|
|
|
|
def get_path(self):
|
|
raise NotImplemented
|
|
|
|
def has_frames(self):
|
|
return False
|
|
|
|
def get_framerate(self):
|
|
for ttype, caps_str in self.get_tracks_caps():
|
|
if ttype != "video":
|
|
continue
|
|
|
|
caps = utils.GstCaps.new_from_str(caps_str)
|
|
if not caps:
|
|
self.warning("Could not create caps for %s" % caps_str)
|
|
continue
|
|
|
|
framerate = caps[0].get("framerate")
|
|
if framerate:
|
|
return framerate
|
|
|
|
return Fraction(0, 1)
|
|
|
|
def get_media_filepath(self):
|
|
raise NotImplemented
|
|
|
|
def skip_parsers(self):
|
|
return False
|
|
|
|
def get_caps(self):
|
|
raise NotImplemented
|
|
|
|
def get_uri(self):
|
|
raise NotImplemented
|
|
|
|
def get_duration(self):
|
|
raise NotImplemented
|
|
|
|
def get_protocol(self):
|
|
raise NotImplemented
|
|
|
|
def is_seekable(self):
|
|
raise NotImplemented
|
|
|
|
def is_live(self):
|
|
raise NotImplemented
|
|
|
|
def is_image(self):
|
|
raise NotImplemented
|
|
|
|
def get_num_tracks(self, track_type):
|
|
raise NotImplemented
|
|
|
|
def get_tracks_caps(self):
|
|
return []
|
|
|
|
def can_play_reverse(self):
|
|
raise NotImplemented
|
|
|
|
def prerrols(self):
|
|
return True
|
|
|
|
def is_compatible(self, scenario):
|
|
if scenario is None:
|
|
return True
|
|
|
|
if scenario.seeks() and (not self.is_seekable() or self.is_image()):
|
|
self.debug("Do not run %s as %s does not support seeking",
|
|
scenario, self.get_uri())
|
|
return False
|
|
|
|
if self.is_image() and scenario.needs_clock_sync():
|
|
self.debug("Do not run %s as %s is an image",
|
|
scenario, self.get_uri())
|
|
return False
|
|
|
|
if not self.can_play_reverse() and scenario.does_reverse_playback():
|
|
return False
|
|
|
|
if not self.is_live() and scenario.needs_live_content():
|
|
self.debug("Do not run %s as %s is not a live content",
|
|
scenario, self.get_uri())
|
|
return False
|
|
|
|
if self.is_live() and not scenario.compatible_with_live_content():
|
|
self.debug("Do not run %s as %s is a live content",
|
|
scenario, self.get_uri())
|
|
return False
|
|
|
|
if not self.prerrols() and getattr(scenario, 'needs_preroll', False):
|
|
return False
|
|
|
|
if self.get_duration() and self.get_duration() / GST_SECOND < scenario.get_min_media_duration():
|
|
self.debug(
|
|
"Do not run %s as %s is too short (%i < min media duation : %i",
|
|
scenario, self.get_uri(),
|
|
self.get_duration() / GST_SECOND,
|
|
scenario.get_min_media_duration())
|
|
return False
|
|
|
|
for track_type in ['audio', 'subtitle', 'video']:
|
|
if self.get_num_tracks(track_type) < scenario.get_min_tracks(track_type):
|
|
self.debug("%s -- %s | At least %s %s track needed < %s"
|
|
% (scenario, self.get_uri(), track_type,
|
|
scenario.get_min_tracks(track_type),
|
|
self.get_num_tracks(track_type)))
|
|
return False
|
|
|
|
return True
|
|
|
|
|
|
class GstValidateMediaDescriptor(MediaDescriptor):
|
|
# Some extension file for discovering results
|
|
SKIPPED_MEDIA_INFO_EXT = "media_info.skipped"
|
|
MEDIA_INFO_EXT = "media_info"
|
|
PUSH_MEDIA_INFO_EXT = "media_info.push"
|
|
STREAM_INFO_EXT = "stream_info"
|
|
|
|
__all_descriptors = {}
|
|
|
|
@classmethod
|
|
def get(cls, xml_path):
|
|
if xml_path in cls.__all_descriptors:
|
|
return cls.__all_descriptors[xml_path]
|
|
return GstValidateMediaDescriptor(xml_path)
|
|
|
|
def __init__(self, xml_path):
|
|
super(GstValidateMediaDescriptor, self).__init__()
|
|
|
|
self._media_file_path = None
|
|
main_descriptor = self.__all_descriptors.get(xml_path)
|
|
if main_descriptor:
|
|
self._copy_data_from_main(main_descriptor)
|
|
else:
|
|
self.__all_descriptors[xml_path] = self
|
|
|
|
self._xml_path = xml_path
|
|
try:
|
|
media_xml = ET.parse(xml_path).getroot()
|
|
except xml.etree.ElementTree.ParseError:
|
|
printc("Could not parse %s" % xml_path,
|
|
Colors.FAIL)
|
|
raise
|
|
self._extract_data(media_xml)
|
|
|
|
self.set_protocol(urllib.parse.urlparse(self.get_uri()).scheme)
|
|
|
|
def skip_parsers(self):
|
|
return self._skip_parsers
|
|
|
|
def has_frames(self):
|
|
return self._has_frames
|
|
|
|
def _copy_data_from_main(self, main_descriptor):
|
|
for attr in main_descriptor.__dict__.keys():
|
|
setattr(self, attr, getattr(main_descriptor, attr))
|
|
|
|
def _extract_data(self, media_xml):
|
|
# Extract the information we need from the xml
|
|
self._caps = media_xml.findall("streams")[0].attrib["caps"]
|
|
self._track_caps = []
|
|
try:
|
|
streams = media_xml.findall("streams")[0].findall("stream")
|
|
except IndexError:
|
|
pass
|
|
else:
|
|
for stream in streams:
|
|
self._track_caps.append(
|
|
(stream.attrib["type"], stream.attrib["caps"]))
|
|
|
|
self._skip_parsers = bool(int(media_xml.attrib.get('skip-parsers', 0)))
|
|
self._has_frames = bool(int(media_xml.attrib["frame-detection"]))
|
|
self._duration = int(media_xml.attrib["duration"])
|
|
self._uri = media_xml.attrib["uri"]
|
|
parsed_uri = urllib.parse.urlparse(self.get_uri())
|
|
self._protocol = media_xml.get("protocol", parsed_uri.scheme)
|
|
if parsed_uri.scheme == "file":
|
|
if not os.path.exists(parsed_uri.path) and os.path.exists(self.get_media_filepath()):
|
|
self._uri = "file://" + self.get_media_filepath()
|
|
elif parsed_uri.scheme == Protocols.IMAGESEQUENCE:
|
|
self._media_file_path = os.path.join(os.path.dirname(self.__cleanup_media_info_ext()), os.path.basename(parsed_uri.path))
|
|
self._uri = parsed_uri._replace(path=os.path.join(os.path.dirname(self.__cleanup_media_info_ext()), os.path.basename(self._media_file_path))).geturl()
|
|
self._is_seekable = media_xml.attrib["seekable"].lower() == "true"
|
|
self._is_live = media_xml.get("live", "false").lower() == "true"
|
|
self._is_image = False
|
|
for stream in media_xml.findall("streams")[0].findall("stream"):
|
|
if stream.attrib["type"] == "image":
|
|
self._is_image = True
|
|
self._track_types = []
|
|
for stream in media_xml.findall("streams")[0].findall("stream"):
|
|
self._track_types.append(stream.attrib["type"])
|
|
|
|
def __cleanup_media_info_ext(self):
|
|
for ext in [self.MEDIA_INFO_EXT, self.PUSH_MEDIA_INFO_EXT, self.STREAM_INFO_EXT,
|
|
self.SKIPPED_MEDIA_INFO_EXT, ]:
|
|
if self._xml_path.endswith(ext):
|
|
return self._xml_path[:len(self._xml_path) - (len(ext) + 1)]
|
|
|
|
assert "Not reached" == None # noqa
|
|
|
|
@staticmethod
|
|
def new_from_uri(uri, verbose=False, include_frames=False, is_push=False, is_skipped=False):
|
|
"""
|
|
include_frames = 0 # Never
|
|
include_frames = 1 # always
|
|
include_frames = 2 # if previous file included them
|
|
|
|
"""
|
|
media_path = utils.url2path(uri)
|
|
|
|
ext = GstValidateMediaDescriptor.MEDIA_INFO_EXT
|
|
if is_push:
|
|
ext = GstValidateMediaDescriptor.PUSH_MEDIA_INFO_EXT
|
|
elif is_skipped:
|
|
ext = GstValidateMediaDescriptor.SKIPPED_MEDIA_INFO_EXT
|
|
descriptor_path = "%s.%s" % (media_path, ext)
|
|
args = GstValidateBaseTestManager.MEDIA_CHECK_COMMAND.split(" ")
|
|
if include_frames == 2:
|
|
try:
|
|
media_xml = ET.parse(descriptor_path).getroot()
|
|
prev_uri = urllib.parse.urlparse(media_xml.attrib['uri'])
|
|
if prev_uri.scheme == Protocols.IMAGESEQUENCE:
|
|
parsed_uri = urllib.parse.urlparse(uri)
|
|
uri = prev_uri._replace(path=os.path.join(os.path.dirname(parsed_uri.path), os.path.basename(prev_uri.path))).geturl()
|
|
include_frames = bool(int(media_xml.attrib["frame-detection"]))
|
|
if bool(int(media_xml.attrib.get("skip-parsers", 0))):
|
|
args.append("--skip-parsers")
|
|
except FileNotFoundError:
|
|
pass
|
|
else:
|
|
include_frames = bool(include_frames)
|
|
args.append(uri)
|
|
|
|
args.extend(["--output-file", descriptor_path])
|
|
if include_frames:
|
|
args.extend(["--full"])
|
|
|
|
if verbose:
|
|
printc("Generating media info for %s\n"
|
|
" Command: '%s'" % (media_path, ' '.join(args)),
|
|
Colors.OKBLUE)
|
|
|
|
try:
|
|
subprocess.check_output(args, stderr=open(os.devnull))
|
|
except subprocess.CalledProcessError as e:
|
|
if verbose:
|
|
printc("Result: Failed", Colors.FAIL)
|
|
else:
|
|
loggable.warning("GstValidateMediaDescriptor",
|
|
"Exception: %s" % e)
|
|
return None
|
|
|
|
if verbose:
|
|
printc("Result: Passed", Colors.OKGREEN)
|
|
|
|
try:
|
|
return GstValidateMediaDescriptor(descriptor_path)
|
|
except (IOError, xml.etree.ElementTree.ParseError):
|
|
return None
|
|
|
|
def get_path(self):
|
|
return self._xml_path
|
|
|
|
def need_clock_sync(self):
|
|
return Protocols.needs_clock_sync(self.get_protocol())
|
|
|
|
def get_media_filepath(self):
|
|
if self._media_file_path is None:
|
|
self._media_file_path = self.__cleanup_media_info_ext()
|
|
return self._media_file_path
|
|
|
|
def get_caps(self):
|
|
return self._caps
|
|
|
|
def get_tracks_caps(self):
|
|
return self._track_caps
|
|
|
|
def get_uri(self):
|
|
return self._uri
|
|
|
|
def get_duration(self):
|
|
return self._duration
|
|
|
|
def set_protocol(self, protocol):
|
|
if self._xml_path.endswith(GstValidateMediaDescriptor.PUSH_MEDIA_INFO_EXT):
|
|
self._protocol = Protocols.PUSHFILE
|
|
else:
|
|
self._protocol = protocol
|
|
|
|
def get_protocol(self):
|
|
return self._protocol
|
|
|
|
def is_seekable(self):
|
|
return self._is_seekable
|
|
|
|
def is_live(self):
|
|
return self._is_live
|
|
|
|
def can_play_reverse(self):
|
|
return True
|
|
|
|
def is_image(self):
|
|
return self._is_image
|
|
|
|
def get_num_tracks(self, track_type):
|
|
n = 0
|
|
for t in self._track_types:
|
|
if t == track_type:
|
|
n += 1
|
|
|
|
return n
|
|
|
|
def get_clean_name(self):
|
|
name = os.path.basename(self.get_path())
|
|
regex = '|'.join(['\\.%s$' % ext for ext in [self.SKIPPED_MEDIA_INFO_EXT, self.MEDIA_INFO_EXT, self.PUSH_MEDIA_INFO_EXT, self.STREAM_INFO_EXT]])
|
|
name = re.sub(regex, "", name)
|
|
|
|
return name.replace('.', "_")
|
|
|
|
|
|
class MediaFormatCombination(object):
|
|
FORMATS = {"aac": "audio/mpeg,mpegversion=4", # Audio
|
|
"ac3": "audio/x-ac3",
|
|
"vorbis": "audio/x-vorbis",
|
|
"mp3": "audio/mpeg,mpegversion=1,layer=3",
|
|
"opus": "audio/x-opus",
|
|
"rawaudio": "audio/x-raw",
|
|
|
|
# Video
|
|
"h264": "video/x-h264",
|
|
"h265": "video/x-h265",
|
|
"vp8": "video/x-vp8",
|
|
"vp9": "video/x-vp9",
|
|
"theora": "video/x-theora",
|
|
"prores": "video/x-prores",
|
|
"jpeg": "image/jpeg",
|
|
|
|
# Containers
|
|
"webm": "video/webm",
|
|
"ogg": "application/ogg",
|
|
"mkv": "video/x-matroska",
|
|
"mp4": "video/quicktime,variant=iso;",
|
|
"quicktime": "video/quicktime;"}
|
|
|
|
def __str__(self):
|
|
return "%s and %s in %s" % (self.audio, self.video, self.container)
|
|
|
|
def __init__(self, container, audio, video, duration_factor=1,
|
|
video_restriction=None, audio_restriction=None):
|
|
"""
|
|
Describes a media format to be used for transcoding tests.
|
|
|
|
:param container: A string defining the container format to be used, must bin in self.FORMATS
|
|
:param audio: A string defining the audio format to be used, must bin in self.FORMATS
|
|
:param video: A string defining the video format to be used, must bin in self.FORMATS
|
|
"""
|
|
self.container = container
|
|
self.audio = audio
|
|
self.video = video
|
|
self.video_restriction = video_restriction
|
|
self.audio_restriction = audio_restriction
|
|
|
|
def get_caps(self, track_type):
|
|
try:
|
|
return self.FORMATS[self.__dict__[track_type]]
|
|
except KeyError:
|
|
return None
|
|
|
|
def get_audio_caps(self):
|
|
return self.get_caps("audio")
|
|
|
|
def get_video_caps(self):
|
|
return self.get_caps("video")
|
|
|
|
def get_muxer_caps(self):
|
|
return self.get_caps("container")
|