#!/usr/bin/env python3 # # Copyright (c) 2013,Thibault Saunier # # This program is free software; you can redistribute it and/or # modify it under the terms of the GNU Lesser General Public # License as published by the Free Software Foundation; either # version 2.1 of the License, or (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU # Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public # License along with this program; if not, write to the # Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, # Boston, MA 02110-1301, USA. """ Some utilies. """ try: import config except ImportError: from . import config import json import numbers import os import platform import re import shutil import shlex import signal import subprocess import sys import tempfile import time import urllib.request import urllib.error import urllib.parse from .loggable import Loggable, warning from operator import itemgetter from xml.etree import ElementTree from collections import defaultdict from fractions import Fraction GST_SECOND = int(1000000000) DEFAULT_TIMEOUT = 30 DEFAULT_MAIN_DIR = os.path.join(config.BUILDDIR, "subprojects", "gst-integration-testsuites") DEFAULT_GST_QA_ASSETS = os.path.join(config.SRCDIR, "subprojects", "gst-integration-testsuites") USING_SUBPROJECT = os.path.exists(os.path.join(config.BUILDDIR, "subprojects", "gst-integration-testsuites")) if not USING_SUBPROJECT: DEFAULT_MAIN_DIR = os.path.join(os.path.expanduser("~"), "gst-validate") DEFAULT_GST_QA_ASSETS = os.path.join(DEFAULT_MAIN_DIR, "gst-integration-testsuites") DEFAULT_MAIN_DIR = os.environ.get('GST_VALIDATE_LAUNCHER_MAIN_DIR', DEFAULT_MAIN_DIR) DEFAULT_TESTSUITES_DIRS = [os.path.join(DEFAULT_GST_QA_ASSETS, "testsuites")] DISCOVERER_COMMAND = "gst-discoverer-1.0" # Use to set the duration from which a test is considered as being 'long' LONG_TEST = 40 class Result(object): NOT_RUN = "Not run" FAILED = "Failed" TIMEOUT = "Timeout" PASSED = "Passed" SKIPPED = "Skipped" KNOWN_ERROR = "Known error" class Protocols(object): HTTP = "http" FILE = "file" PUSHFILE = "pushfile" HLS = "hls" DASH = "dash" RTSP = "rtsp" IMAGESEQUENCE = "imagesequence" @staticmethod def needs_clock_sync(protocol): if protocol in [Protocols.HLS, Protocols.DASH]: return True return False def is_tty(): return hasattr(sys.stdout, 'isatty') and sys.stdout.isatty() def supports_ansi_colors(): if 'GST_VALIDATE_LAUNCHER_FORCE_COLORS' in os.environ: return True platform = sys.platform supported_platform = platform != 'win32' or 'ANSICON' in os.environ if not supported_platform or not is_tty(): return False return True class Colors(object): HEADER = '\033[95m' OKBLUE = '\033[94m' OKGREEN = '\033[92m' WARNING = '\033[93m' FAIL = '\033[91m' ENDC = '\033[0m' def desactivate_colors(): Colors.HEADER = '' Colors.OKBLUE = '' Colors.OKGREEN = '' Colors.WARNING = '' Colors.FAIL = '' Colors.ENDC = '' if not supports_ansi_colors(): desactivate_colors() def mkdir(directory): try: os.makedirs(directory) except os.error: pass def which(name, extra_path=None): exts = [_f for _f in os.environ.get('PATHEXT', '').split(os.pathsep) if _f] path = os.environ.get('PATH', '') if extra_path: path = extra_path + os.pathsep + path if not path: return [] for p in path.split(os.pathsep): p = os.path.join(p, name) if os.access(p, os.X_OK): return p for e in exts: pext = p + e if os.access(pext, os.X_OK): return pext return None def get_color_for_result(result): if result is Result.FAILED: color = Colors.FAIL elif result is Result.TIMEOUT: color = Colors.WARNING elif result is Result.PASSED: color = Colors.OKGREEN else: color = Colors.OKBLUE return color last_carriage_return_len = 0 def printc(message, color="", title=False, title_char='', end="\n"): global last_carriage_return_len if title or title_char: length = 0 for l in message.split("\n"): if len(l) > length: length = len(l) if length == 0: length = len(message) needed_spaces = ' ' * max(0, last_carriage_return_len - length) if title is True: message = length * "=" + needed_spaces + "\n" \ + str(message) + "\n" + length * '=' else: message = str(message) + needed_spaces + "\n" + \ length * title_char if hasattr(message, "result") and color == '': color = get_color_for_result(message.result) if not is_tty(): end = "\n" message = str(message) message += ' ' * max(0, last_carriage_return_len - len(message)) if end == '\r': term_width = shutil.get_terminal_size((80, 20))[0] if len(message) > term_width: message = message[0:term_width - 2] + '…' last_carriage_return_len = len(message) else: last_carriage_return_len = 0 sys.stdout.write(color + str(message) + Colors.ENDC + end) sys.stdout.flush() def launch_command(command, color=None, fails=False): printc(command, Colors.OKGREEN, True) res = os.system(command) if res != 0 and fails is True: raise subprocess.CalledProcessError(res, "%s failed" % command) def path2url(path): return urllib.parse.urljoin('file:', urllib.request.pathname2url(path)) def is_windows(): platname = platform.system().lower() return platname == 'windows' or 'mingw' in platname def url2path(url): path = urllib.parse.urlparse(url).path if "win32" in sys.platform: if path[0] == '/': return path[1:] # We need to remove the first '/' on windows path = urllib.parse.unquote(path) return path def isuri(string): url = urllib.parse.urlparse(string) if url.scheme != "" and url.scheme != "": return True return False def touch(fname, times=None): with open(fname, 'a'): os.utime(fname, times) def get_subclasses(klass, env): subclasses = [] for symb in env.items(): try: if issubclass(symb[1], klass) and not symb[1] is klass: subclasses.append(symb[1]) except TypeError: pass return subclasses def TIME_ARGS(time): return "%u:%02u:%02u.%09u" % (time / (GST_SECOND * 60 * 60), (time / (GST_SECOND * 60)) % 60, (time / GST_SECOND) % 60, time % GST_SECOND) def look_for_file_in_source_dir(subdir, name): root_dir = os.path.abspath(os.path.dirname( os.path.join(os.path.dirname(os.path.abspath(__file__))))) p = os.path.join(root_dir, subdir, name) if os.path.exists(p): return p return None # Returns the path $top_src_dir/@subdir/@name if running from source, or # $DATADIR/gstreamer-1.0/validate/@name if not def get_data_file(subdir, name): # Are we running from sources? p = look_for_file_in_source_dir(subdir, name) if p: return p # Look in system data dirs p = os.path.join(config.DATADIR, 'gstreamer-1.0', 'validate', name) if os.path.exists(p): return p return None # # Some utilities to parse gst-validate output # # def gsttime_from_tuple(stime): return int((int(stime[0]) * 3600 + int(stime[1]) * 60 + int(stime[2])) * GST_SECOND + int(stime[3])) timeregex = re.compile(r'(?P<_0>.+):(?P<_1>.+):(?P<_2>.+)\.(?P<_3>.+)') def parse_gsttimeargs(time): stime = list(map(itemgetter(1), sorted( timeregex.match(time).groupdict().items()))) return int((int(stime[0]) * 3600 + int(stime[1]) * 60 + int(stime[2])) * GST_SECOND + int(stime[3])) def get_duration(media_file): duration = 0 res = '' try: res = subprocess.check_output( [DISCOVERER_COMMAND, media_file]).decode() except subprocess.CalledProcessError: # gst-media-check returns !0 if seeking is not possible, we do not care # in that case. pass for l in res.split('\n'): if "Duration: " in l: duration = parse_gsttimeargs(l.replace("Duration: ", "")) break return duration def get_scenarios(): GST_VALIDATE_COMMAND = "gst-validate-1.0" os.system("%s --scenarios-defs-output-file %s" % (GST_VALIDATE_COMMAND, )) def get_gst_build_valgrind_suppressions(): if hasattr(get_gst_build_valgrind_suppressions, "data"): return get_gst_build_valgrind_suppressions.data get_gst_build_valgrind_suppressions.data = [] if not os.path.exists(os.path.join(config.SRCDIR, "subprojects")): return get_gst_build_valgrind_suppressions.data for suppression_path in ["gstreamer/tests/check/gstreamer.supp", "gst-plugins-base/tests/check/gst-plugins-base.supp", "gst-plugins-good/tests/check/gst-plugins-good.supp", "gst-plugins-bad/tests/check/gst-plugins-bad.supp", "gst-plugins-ugly/tests/check/gst-plugins-ugly.supp", "gst-libav/tests/check/gst-libav.supp", "gst-devtools/validate/data/gstvalidate.supp", "libnice/tests/libnice.supp", "libsoup/tests/libsoup.supp", "glib/glib.supp", "gst-python/testsuite/gstpython.supp", "gst-python/testsuite/python.supp", ]: suppression = os.path.join(config.SRCDIR, "subprojects", suppression_path) if os.path.exists(suppression): get_gst_build_valgrind_suppressions.data.append(suppression) return get_gst_build_valgrind_suppressions.data class BackTraceGenerator(Loggable): __instance = None _command_line_regex = re.compile(r'Command Line: (.*)\n') _timestamp_regex = re.compile(r'Timestamp: .*\((\d*)s ago\)') _pid_regex = re.compile(r'PID: (\d+) \(.*\)') def __init__(self): Loggable.__init__(self) self.in_flatpak = os.path.exists("/usr/manifest.json") if self.in_flatpak: coredumpctl = ['flatpak-spawn', '--host', 'coredumpctl'] else: coredumpctl = ['coredumpctl'] try: subprocess.check_output(coredumpctl) self.coredumpctl = coredumpctl except Exception as e: self.warning(e) self.coredumpctl = None self.gdb = shutil.which('gdb') @classmethod def get_default(cls): if not cls.__instance: cls.__instance = BackTraceGenerator() return cls.__instance def get_trace(self, test): if not test.process.returncode: return self.get_trace_on_running_process(test) if self.coredumpctl: return self.get_trace_from_systemd(test) self.debug("coredumpctl not present, and it is the only" " supported way to get backtraces for now.") return None def get_trace_on_running_process(self, test): if not self.gdb: return "Can not generate stack trace as `gdb` is not" \ "installed." gdb = ['gdb', '-ex', 't a a bt', '-batch', '-p', str(test.process.pid)] try: return subprocess.check_output( gdb, stderr=subprocess.STDOUT, timeout=30).decode() except Exception as e: return "Could not run `gdb` on process (pid: %d):\n%s" % ( test.process.pid, e) def get_trace_from_systemd(self, test): for ntry in range(10): if ntry != 0: # Loopping, it means we conceder the logs might not be ready # yet. time.sleep(1) if not self.in_flatpak: coredumpctl = self.coredumpctl + ['info', str(test.process.pid)] else: newer_than = time.strftime("%a %Y-%m-%d %H:%M:%S %Z", time.localtime(test._starting_time)) coredumpctl = self.coredumpctl + ['info', os.path.basename(test.command[0]), '--since', newer_than] try: info = subprocess.check_output(coredumpctl, stderr=subprocess.STDOUT) except subprocess.CalledProcessError: # The trace might not be ready yet time.sleep(1) continue info = info.decode() try: pid = self._pid_regex.findall(info)[0] except IndexError: self.debug("Backtrace could not be found yet, trying harder.") continue application = test.process.args[0] command_line = BackTraceGenerator._command_line_regex.findall(info)[0] if shlex.split(command_line)[0] != application: self.debug("PID: %s -- executable %s != test application: %s" % ( pid, command_line[0], test.application)) # The trace might not be ready yet continue if not BackTraceGenerator._timestamp_regex.findall(info): self.debug("Timestamp %s is more than 1min old", re.findall(r'Timestamp: .*', info)) # The trace might not be ready yet continue bt_all = None if self.gdb: try: with tempfile.NamedTemporaryFile() as stderr: coredump = subprocess.check_output(self.coredumpctl + ['dump', pid], stderr=stderr) with tempfile.NamedTemporaryFile() as tf: tf.write(coredump) tf.flush() gdb = ['gdb', '-ex', 't a a bt', '-ex', 'quit', application, tf.name] bt_all = subprocess.check_output( gdb, stderr=subprocess.STDOUT).decode() info += "\nThread apply all bt:\n\n%s" % ( bt_all.replace('\n', '\n' + 15 * ' ')) except Exception as e: self.error("Could not get backtrace from gdb: %s" % e) return info return None ALL_GITLAB_ISSUES = defaultdict(list) def check_bugs_resolution(bugs_definitions): bugz = {} gitlab_issues = defaultdict(list) regexes = {} for regex, bugs in bugs_definitions: if isinstance(bugs, str): bugs = [bugs] for bug in bugs: url = urllib.parse.urlparse(bug) if "gitlab" in url.netloc: components = [c for c in url.path.split('/') if c] if len(components) != 4: printc("\n + %s \n --> bug: %s\n --> Status: Not a proper gitlab report" % (regex, bug), Colors.WARNING) continue project_id = components[0] + '%2F' + components[1] issue_id = components[3] gitlab_url = "https://%s/api/v4/projects/%s/issues/%s" % (url.hostname, project_id, issue_id) if gitlab_url in ALL_GITLAB_ISSUES: continue gitlab_issues[gitlab_url].append(regex) ALL_GITLAB_ISSUES[gitlab_url].append(regex) continue if "bugzilla" not in url.netloc: continue query = urllib.parse.parse_qs(url.query) _id = query.get('id') if not _id: printc("\n + '%s' -- Can't check bug '%s'" % (regex, bug), Colors.WARNING) continue if isinstance(_id, list): _id = _id[0] regexes[_id] = (regex, bug) url_parts = tuple(list(url)[:3] + ['', '', '']) ids = bugz.get(url_parts, []) ids.append(_id) bugz[url_parts] = ids res = True for gitlab_url, regexe in gitlab_issues.items(): try: issue = json.load(urllib.request.urlopen(gitlab_url)) except Exception as e: printc("\n + Could not properly check bugs status for: %s (%s)" % (gitlab_url, e), Colors.FAIL) continue if issue['state'] in ['closed']: printc("\n + %s \n --> %s: '%s'\n ==> Bug CLOSED already (status: %s)" % ( regexe, issue['web_url'], issue['title'], issue['state']), Colors.FAIL) res = False for url_parts, ids in bugz.items(): url_parts = list(url_parts) query = {'id': ','.join(ids)} query['ctype'] = 'xml' url_parts[4] = urllib.parse.urlencode(query) try: res = urllib.request.urlopen(urllib.parse.urlunparse(url_parts)) except Exception as e: printc("\n + Could not properly check bugs status for: %s (%s)" % (urllib.parse.urlunparse(url_parts), e), Colors.FAIL) continue root = ElementTree.fromstring(res.read()) bugs = root.findall('./bug') if len(bugs) != len(ids): printc("\n + Could not properly check bugs status on server %s" % urllib.parse.urlunparse(url_parts), Colors.FAIL) continue for bugelem in bugs: status = bugelem.findtext('./bug_status') bugid = bugelem.findtext('./bug_id') regex, bug = regexes[bugid] desc = bugelem.findtext('./short_desc') if not status: printc("\n + %s \n --> bug: %s\n --> Status: UNKNOWN" % (regex, bug), Colors.WARNING) continue if not status.lower() in ['new', 'verified']: printc("\n + %s \n --> bug: #%s: '%s'\n ==> Bug CLOSED already (status: %s)" % ( regex, bugid, desc, status), Colors.WARNING) res = False printc("\n + %s \n --> bug: #%s: '%s'\n --> Status: %s" % ( regex, bugid, desc, status), Colors.OKGREEN) if not res: printc("\n==> Some bugs marked as known issues have been closed!", Colors.FAIL) return res def kill_subprocess(owner, process, timeout): if process is None: return stime = time.time() res = process.poll() waittime = 0.05 killsig = None if not is_windows(): killsig = signal.SIGINT while res is None: try: owner.debug("Subprocess is still alive, sending KILL signal") if is_windows(): subprocess.call( ['taskkill', '/F', '/T', '/PID', str(process.pid)]) else: process.send_signal(killsig) time.sleep(waittime) waittime *= 2 except OSError: pass if not is_windows() and time.time() - stime > timeout / 4: killsig = signal.SIGKILL if time.time() - stime > timeout: printc("Could not kill %s subprocess after %s second" " Something is really wrong, => EXITING" % (owner, timeout), Colors.FAIL) return res = process.poll() return res def format_config_template(extra_data, config_text, test_name): # Variables available for interpolation inside config blocks. extra_vars = extra_data.copy() if 'validate-flow-expectations-dir' in extra_vars and \ 'validate-flow-actual-results-dir' in extra_vars: expectations_dir = os.path.join(extra_vars['validate-flow-expectations-dir'], test_name.replace('.', os.sep)) actual_results_dir = os.path.join(extra_vars['validate-flow-actual-results-dir'], test_name.replace('.', os.sep)) extra_vars['validateflow'] = "validateflow, expectations-dir=\"%s\", actual-results-dir=\"%s\"" % (expectations_dir, actual_results_dir) if 'ssim-results-dir' in extra_vars: ssim_results = extra_vars['ssim-results-dir'] extra_vars['ssim'] = "validatessim, result-output-dir=\"%s\", output-dir=\"%s\"" % ( os.path.join(ssim_results, test_name.replace('.', os.sep), 'diff-images'), os.path.join(ssim_results, test_name.replace('.', os.sep), 'images'), ) return config_text % extra_vars def get_fakesink_for_media_type(media_type, needs_clock=False): if media_type == "video": if needs_clock: return 'fakevideosink qos=true max-lateness=20000000' return "fakevideosink sync=false" if needs_clock: return "fakesink sync=true" return "fakesink" class InvalidValueError(ValueError): """Received value is invalid""" def __init__(self, name, value, expect): ValueError.__init__( self, "Invalid value {!r} for {}. Expect {}.".format( value, name, expect)) def wrong_type_for_arg(val, expect_type_name, arg_name): """Raise exception in response to a wrong argument type""" raise TypeError( "Expect a {} type for the '{}' argument. Received a {} type." "".format(expect_type_name, arg_name, type(val).__name__)) class DeserializeError(Exception): """Receive an incorrectly serialized value""" MAX_LEN = 20 def __init__(self, read, reason): if len(read) > self.MAX_LEN: read = read[:self.MAX_LEN] + "..." Exception.__init__( self, "Could not deserialize the string ({}) because it {}." "".format(read, reason)) class GstStructure(Loggable): """ This implementation has been copied from OpenTimelineIO. Note that the types are to correspond to GStreamer/GES GTypes, rather than python types. Current supported GTypes: GType Associated Accepted Python type aliases ====================================== gint int int, i glong int gint64 int guint int uint, u gulong int guint64 int gfloat float float, f gdouble float double, d gboolean bool boolean, bool, b string str or None str, s GstFraction str or fraction Fraction GstStructure GstStructure structure schema GstCaps GstCaps schema Note that other types can be given: these must be given as strings and the user will be responsible for making sure they are already in a serialized form. """ INT_TYPES = ("int", "glong", "gint64") UINT_TYPES = ("uint", "gulong", "guint64") FLOAT_TYPES = ("float", "double") BOOLEAN_TYPE = "boolean" FRACTION_TYPE = "fraction" STRING_TYPE = "string" STRUCTURE_TYPE = "structure" CAPS_TYPE = "GstCaps" KNOWN_TYPES = INT_TYPES + UINT_TYPES + FLOAT_TYPES + ( BOOLEAN_TYPE, FRACTION_TYPE, STRING_TYPE, STRUCTURE_TYPE, CAPS_TYPE) TYPE_ALIAS = { "i": "int", "gint": "int", "u": "uint", "guint": "uint", "f": "float", "gfloat": "float", "d": "double", "gdouble": "double", "b": BOOLEAN_TYPE, "bool": BOOLEAN_TYPE, "gboolean": BOOLEAN_TYPE, "GstFraction": FRACTION_TYPE, "str": STRING_TYPE, "s": STRING_TYPE, "GstStructure": STRUCTURE_TYPE } def __init__(self, name=None, fields=None): if name is None: name = "Unnamed" if fields is None: fields = {} if type(name) is not str: wrong_type_for_arg(name, "str", "name") self._check_name(name) self.name = name try: fields = dict(fields) except (TypeError, ValueError): wrong_type_for_arg(fields, "dict", "fields") self.fields = {} for key in fields: entry = fields[key] if type(entry) is not tuple: try: entry = tuple(entry) except (TypeError, ValueError): raise TypeError( "Expect dict to be filled with tuple-like " "entries") if len(entry) != 2: raise TypeError( "Expect dict to be filled with 2-entry tuples") self.set(key, *entry) def __repr__(self): return "GstStructure({!r}, {!r})".format(self.name, self.fields) UNKNOWN_PREFIX = "[UNKNOWN]" @classmethod def _make_type_unknown(cls, _type): return cls.UNKNOWN_PREFIX + _type # note the sqaure brackets make the type break the TYPE_FORMAT @classmethod def _is_unknown_type(cls, _type): return _type[:len(cls.UNKNOWN_PREFIX)] == cls.UNKNOWN_PREFIX @classmethod def _get_unknown_type(cls, _type): return _type[len(cls.UNKNOWN_PREFIX):] def _field_to_str(self, key): """Return field in a serialized form""" _type, value = self.fields[key] if type(key) is not str: raise TypeError("Found a key that is not a str type") if type(_type) is not str: raise TypeError( "Found a type name that is not a str type") self._check_key(key) _type = self.TYPE_ALIAS.get(_type, _type) if self._is_unknown_type(_type): _type = self._get_unknown_type(_type) self._check_type(_type) self._check_unknown_typed_value(value) # already in serialized form else: self._check_type(_type) value = self.serialize_value(_type, value) return "{}=({}){}".format(key, _type, value) def _fields_to_str(self): write = [] for key in self.fields: write.append(", {}".format(self._field_to_str(key))) return "".join(write) def _name_to_str(self): """Return the name in a serialized form""" return self._check_name(self.name) def __str__(self): """Emulates gst_structure_to_string""" return "{}{};".format(self._name_to_str(), self._fields_to_str()) def get_type_name(self, key): """Return the field type""" _type = self.fields[key][0] return _type def get_value(self, key): """Return the field value""" value = self.fields[key][1] return value def __getitem__(self, key): return self.get_value(key) def __len__(self): return len(self.fields) @staticmethod def _val_type_err(typ, val, expect): raise TypeError( "Received value ({!s}) is a {} rather than a {}, even " "though the {} type was given".format( val, type(val).__name__, expect, typ)) def set(self, key, _type, value): """Set a field to the given typed value""" if type(key) is not str: wrong_type_for_arg(key, "str", "key") if type(_type) is not str: wrong_type_for_arg(_type, "str", "_type") _type = self.TYPE_ALIAS.get(_type, _type) if self.fields.get(key) == (_type, value): return self._check_key(key) type_is_unknown = True if self._is_unknown_type(_type): # this can happen if the user is setting a GstStructure # using a preexisting GstStructure, the type will then # be passed and marked as unknown _type = self._get_unknown_type(_type) self._check_type(_type) else: self._check_type(_type) if _type in self.INT_TYPES: type_is_unknown = False if not isinstance(value, int): self._val_type_err(_type, value, "int") elif _type in self.UINT_TYPES: type_is_unknown = False if not isinstance(value, int): self._val_type_err(_type, value, "int") if value < 0: raise InvalidValueError( "value", value, "a positive integer for {} " "types".format(_type)) elif _type in self.FLOAT_TYPES: type_is_unknown = False if type(value) is not float: self._val_type_err(_type, value, "float") elif _type == self.BOOLEAN_TYPE: type_is_unknown = False if type(value) is not bool: self._val_type_err(_type, value, "bool") elif _type == self.FRACTION_TYPE: type_is_unknown = False if type(value) is Fraction: value = value elif type(value) is str: try: Fraction(value) except ValueError: raise InvalidValueError( "value", value, "a fraction for the {} " "types".format(_type)) else: self._val_type_err(_type, value, "Fraction or str") elif _type == self.STRING_TYPE: type_is_unknown = False if value is not None and type(value) is not str: self._val_type_err(_type, value, "str or None") elif _type == self.STRUCTURE_TYPE: type_is_unknown = False if not isinstance(value, GstStructure): self._val_type_err(_type, value, "GstStructure") elif _type == self.CAPS_TYPE: type_is_unknown = False if not isinstance(value, GstCaps): self._val_type_err(_type, value, "GstCaps") if type_is_unknown: self._check_unknown_typed_value(value) warning('GstStructure', "The GstStructure type {} with the value ({}) is " "unknown. The value will be stored and serialized as " "given.".format(_type, value)) _type = self._make_type_unknown(_type) self.fields[key] = (_type, value) def get(self, key, default=None): """Return the raw value associated with key""" if key in self.fields: value = self.get_value(key) return value return default def get_typed(self, key, expect_type, default=None): """ Return the raw value associated with key if its type matches. Raises a warning if a value exists under key but is of the wrong type. """ if type(expect_type) is not str: wrong_type_for_arg(expect_type, "str", "expect_type") expect_type = self.TYPE_ALIAS.get(expect_type, expect_type) if key in self.fields: type_name = self.get_type_name(key) if expect_type == type_name: value = self.get_value(key) return value warning('GstStructure', "The structure {} contains a value under {}, but is " "a {}, rather than the expected {} type".format( self.name, key, type_name, expect_type)) return default def values(self): """Return a list of all values contained in the structure""" return [self.get_value(key) for key in self.fields] def values_of_type(self, _type): """ Return a list of all values contained of the given type in the structure """ if type(_type) is not str: wrong_type_for_arg(_type, "str", "_type") _type = self.TYPE_ALIAS.get(_type, _type) return [self.get_value(key) for key in self.fields if self.get_type_name(key) == _type] ASCII_SPACES = r"(\\?[ \t\n\r\f\v])*" END_FORMAT = r"(?P" + ASCII_SPACES + r")" NAME_FORMAT = r"(?P[a-zA-Z][a-zA-Z0-9/_.:-]*)" # ^Format requirement for the name of a GstStructure SIMPLE_STRING = r"[a-zA-Z0-9_+/:.-]+" # see GST_ASCII_CHARS (below) KEY_FORMAT = r"(?P" + SIMPLE_STRING + r")" # NOTE: GstStructure technically allows more general keys, but # these can break the parsing. TYPE_FORMAT = r"(?P" + SIMPLE_STRING + r")" BASIC_VALUE_FORMAT = \ r'(?P("(\\.|[^"])*")|(' + SIMPLE_STRING + r'))' # consume simple string or a string between quotes. Second will # consume anything that is escaped, including a '"' # NOTE: \\. is used rather than \\" since: # + '"start\"end;"' should be captured as '"start\"end"' since # the '"' is escaped. # + '"start\\"end;"' should be captured as '"start\\"' since the # '\' is escaped, not the '"' # In the fist case \\. will consume '\"', and in the second it will # consumer '\\', as desired. The second would not work with just \\" @staticmethod def _check_against_regex(check, regex, name): if not regex.fullmatch(check): raise InvalidValueError( name, check, "to match the regular expression {}" "".format(regex.pattern)) NAME_REGEX = re.compile(NAME_FORMAT) KEY_REGEX = re.compile(KEY_FORMAT) TYPE_REGEX = re.compile(TYPE_FORMAT) @classmethod def _check_name(cls, name): cls._check_against_regex(name, cls.NAME_REGEX, "name") @classmethod def _check_key(cls, key): cls._check_against_regex(key, cls.KEY_REGEX, "key") @classmethod def _check_type(cls, _type): cls._check_against_regex(_type, cls.TYPE_REGEX, "type") @classmethod def _check_unknown_typed_value(cls, value): if type(value) is not str: cls._val_type_err("unknown", value, "string") try: # see if the value could be successfully parsed in again ret_type, ret_val, _ = cls._parse_value(value, False) except DeserializeError as err: raise InvalidValueError( "value", value, "unknown-typed values to be in a " "serialized format ({!s})".format(err)) else: if ret_type is not None: raise InvalidValueError( "value", value, "unknown-typed values to *not* " "start with a type specification, only the " "serialized value should be given") if ret_val != value: raise InvalidValueError( "value", value, "unknown-typed values to be the " "same as its parsed value {}".format(ret_val)) PARSE_NAME_REGEX = re.compile( ASCII_SPACES + NAME_FORMAT + END_FORMAT) @classmethod def _parse_name(cls, read): match = cls.PARSE_NAME_REGEX.match(read) if match is None: raise DeserializeError( read, "does not start with a correct name") name = match.group("name") read = read[match.end("end"):] return name, read @classmethod def _parse_range_list_array(cls, read): start = read[0] end = {'[': ']', '{': '}', '<': '>'}.get(start) read = read[1:] values = [start, ' '] first = True while read and read[0] != end: if first: first = False else: if read and read[0] != ',': DeserializeError( read, "does not contain a comma between listed " "items") values.append(", ") read = read[1:] _type, value, read = cls._parse_value(read, False) if _type is not None: if cls._is_unknown_type(_type): # remove unknown marker for serialization _type = cls._get_unknown_type(_type) values.extend(('(', _type, ')')) values.append(value) if not read: raise DeserializeError( read, "ended before {} could be found".format(end)) read = read[1:] # skip past 'end' match = cls.END_REGEX.match(read) # skip whitespace read = read[match.end("end"):] # NOTE: we are ignoring the incorrect cases where a range # has 0, 1 or 4+ values! This is the users responsiblity. values.extend((' ', end)) return "".join(values), read FIELD_START_REGEX = re.compile( ASCII_SPACES + KEY_FORMAT + ASCII_SPACES + r"=" + END_FORMAT) FIELD_TYPE_REGEX = re.compile( ASCII_SPACES + r"(\(" + ASCII_SPACES + TYPE_FORMAT + ASCII_SPACES + r"\))?" + END_FORMAT) FIELD_VALUE_REGEX = re.compile( ASCII_SPACES + BASIC_VALUE_FORMAT + END_FORMAT) END_REGEX = re.compile(END_FORMAT) @classmethod def _parse_value(cls, read, deserialize=True): match = cls.FIELD_TYPE_REGEX.match(read) # match shouldn't be None since the (TYPE_FORMAT) is optional # and the rest is just ASCII_SPACES _type = match.group("type") if _type is None and deserialize: # if deserialize is False, the (type) is optional raise DeserializeError( read, "does not contain a valid '(type)' format") _type = cls.TYPE_ALIAS.get(_type, _type) type_is_unknown = True read = read[match.end("end"):] if read and read[0] in ('[', '{', '<'): # range/list/array types # this is an unknown type, even though _type itself may # be known. e.g. a list on integers will have _type as 'int' # but the corresponding value can not be deserialized as an # integer value, read = cls._parse_range_list_array(read) if deserialize: # prevent printing on subsequent calls if we find a # list within a list, etc. warning('GstStructure', "GstStructure received a range/list/array of type " "{}, which can not be deserialized. Storing the " "value as {}.".format(_type, value)) else: match = cls.FIELD_VALUE_REGEX.match(read) if match is None: raise DeserializeError( read, "does not have a valid value format") read = read[match.end("end"):] value = match.group("value") if deserialize: if _type in cls.KNOWN_TYPES: type_is_unknown = False try: value = cls.deserialize_value(_type, value) except DeserializeError as err: raise DeserializeError( read, "contains an invalid typed value " "({!s})".format(err)) else: warning('GstStructure', "GstStructure found a type {} that is unknown. " "The corresponding value ({}) will not be " "deserialized and will be stored as given." "".format(_type, value)) if type_is_unknown and _type is not None: _type = cls._make_type_unknown(_type) return _type, value, read @classmethod def _parse_field(cls, read): match = cls.FIELD_START_REGEX.match(read) if match is None: raise DeserializeError( read, "does not have a valid 'key=...' format") key = match.group("key") read = read[match.end("end"):] _type, value, read = cls._parse_value(read) return key, _type, value, read @classmethod def _parse_fields(cls, read): if type(read) is not str: wrong_type_for_arg(read, "str", "read") fields = {} while read and read[0] != ';': if read and read[0] != ',': DeserializeError( read, "does not separate fields with commas") read = read[1:] key, _type, value, read = cls._parse_field(read) fields[key] = (_type, value) if read: # read[0] == ';' read = read[1:] return fields, read @classmethod def new_from_str(cls, read): """ Returns a new instance of GstStructure, based on the Gst library function gst_structure_from_string. Strings obtained from the GstStructure str() method can be parsed in to recreate the original GstStructure. """ if type(read) is not str: wrong_type_for_arg(read, "str", "read") name, read = cls._parse_name(read) fields = cls._parse_fields(read)[0] return GstStructure(name=name, fields=fields) @staticmethod def _val_read_err(typ, val): raise DeserializeError( val, "does not translated to the {} type".format(typ)) @classmethod def deserialize_value(cls, _type, value): """Return the value as the corresponding type""" if type(_type) is not str: wrong_type_for_arg(_type, "str", "_type") if type(value) is not str: wrong_type_for_arg(value, "str", "value") _type = cls.TYPE_ALIAS.get(_type, _type) if _type in cls.INT_TYPES or _type in cls.UINT_TYPES: try: value = int(value) except ValueError: cls._val_read_err(_type, value) if _type in cls.UINT_TYPES and value < 0: cls._val_read_err(_type, value) elif _type in cls.FLOAT_TYPES: try: value = float(value) except ValueError: cls._val_read_err(_type, value) elif _type == cls.BOOLEAN_TYPE: try: value = cls.deserialize_boolean(value) except DeserializeError: cls._val_read_err(_type, value) elif _type == cls.FRACTION_TYPE: try: value = Fraction(value) except ValueError: cls._val_read_err(_type, value) elif _type == cls.STRING_TYPE: try: value = cls.deserialize_string(value) except DeserializeError as err: raise DeserializeError( value, "does not translate to a string ({!s})" "".format(err)) elif _type == cls.STRUCTURE_TYPE: try: value = cls.deserialize_structure(value) except DeserializeError as err: raise DeserializeError( value, "does not translate to a GstStructure ({!s})" "".format(err)) elif _type == cls.CAPS_TYPE: try: value = cls.deserialize_caps(value) except DeserializeError as err: raise DeserializeError( value, "does not translate to a GstCaps ({!s})" "".format(err)) else: raise ValueError( "The type {} is unknown, so the value ({}) can not " "be deserialized.".format(_type, value)) return value @classmethod def serialize_value(cls, _type, value): """Serialize the typed value as a string""" if type(_type) is not str: wrong_type_for_arg(_type, "str", "_type") _type = cls.TYPE_ALIAS.get(_type, _type) if _type in cls.INT_TYPES + cls.UINT_TYPES + cls.FLOAT_TYPES \ + (cls.FRACTION_TYPE, ): return str(value) if _type == cls.BOOLEAN_TYPE: return cls.serialize_boolean(value) if _type == cls.STRING_TYPE: return cls.serialize_string(value) if _type == cls.STRUCTURE_TYPE: return cls.serialize_structure(value) if _type == cls.CAPS_TYPE: return cls.serialize_caps(value) raise ValueError( "The type {} is unknown, so the value ({}) can not be " "serialized.".format(_type, str(value))) # see GST_ASCII_IS_STRING in gst_private.h GST_ASCII_CHARS = [ ord(l) for l in "abcdefghijklmnopqrstuvwxyz" "ABCDEFGHIJKLMNOPQRSTUVWXYZ" "0123456789" "_-+/:." ] LEADING_OCTAL_CHARS = [ord(l) for l in "0123"] OCTAL_CHARS = [ord(l) for l in "01234567"] @classmethod def serialize_string(cls, value): """ Emulates gst_value_serialize_string. Accepts a bytes, str or None type. Returns a str type. """ if value is not None and type(value) is not str: wrong_type_for_arg(value, "None or str", "value") return cls._wrap_string(value) @classmethod def _wrap_string(cls, read): if read is None: return "NULL" if read == "NULL": return "\"NULL\"" if type(read) is bytes: pass elif type(read) is str: read = read.encode() else: wrong_type_for_arg(read, "None, str, or bytes", "read") if not read: return '""' added_wrap = False ser_string_list = [] for byte in read: if byte in cls.GST_ASCII_CHARS: ser_string_list.append(chr(byte)) elif byte < 0x20 or byte >= 0x7f: ser_string_list.append("\\{:03o}".format(byte)) added_wrap = True else: ser_string_list.append("\\" + chr(byte)) added_wrap = True if added_wrap: ser_string_list.insert(0, '"') ser_string_list.append('"') return "".join(ser_string_list) @classmethod def deserialize_string(cls, read): """ Emulates gst_value_deserialize_string. Accepts a str type. Returns a str or None type. """ if type(read) is not str: wrong_type_for_arg(read, "str", "read") if read == "NULL": return None if not read: return "" if read[0] != '"' or read[-1] != '"': return read return cls._unwrap_string(read) @classmethod def _unwrap_string(cls, read): """Emulates gst_string_unwrap""" if type(read) is not bytes: read_array = read.encode() byte_list = [] bytes_iter = iter(read_array) def next_byte(): try: return next(bytes_iter) except StopIteration: raise DeserializeError(read, "end unexpectedly") byte = next_byte() if byte != ord('"'): raise DeserializeError( read, "does not start with '\"', but ends with '\"'") while True: byte = next_byte() if byte in cls.GST_ASCII_CHARS: byte_list.append(byte) elif byte == ord('"'): try: next(bytes_iter) except StopIteration: # expect there to be no more bytes break raise DeserializeError( read, "contains an un-escaped '\"' before the end") elif byte == ord('\\'): byte = next_byte() if byte in cls.LEADING_OCTAL_CHARS: # could be the start of an octal byte2 = next_byte() byte3 = next_byte() if byte2 in cls.OCTAL_CHARS and byte3 in cls.OCTAL_CHARS: nums = [b - ord('0') for b in (byte, byte2, byte3)] byte = (nums[0] << 6) + (nums[1] << 3) + nums[2] byte_list.append(byte) else: raise DeserializeError( read, "contains the start of an octal " "sequence but not the end") else: if byte == 0: raise DeserializeError( read, "contains a null byte after an escape") byte_list.append(byte) else: raise DeserializeError( read, "contains an unexpected un-escaped character") out_str = bytes(bytearray(byte_list)) try: return out_str.decode() except (UnicodeError, ValueError): raise DeserializeError( read, "contains invalid utf-8 byte sequences") @staticmethod def serialize_boolean(value): """ Emulates gst_value_serialize_boolean. Accepts bool type. Returns a str type. """ if type(value) is not bool: wrong_type_for_arg(value, "bool", "value") if value: return "true" return "false" @staticmethod def deserialize_boolean(read): """ Emulates gst_value_deserialize_boolean. Accepts str type. Returns a bool type. """ if type(read) is not str: wrong_type_for_arg(read, "str", "read") if read.lower() in ("true", "t", "yes", "1"): return True if read.lower() in ("false", "f", "no", "0"): return False raise DeserializeError(read, "is an unknown boolean value") @classmethod def serialize_structure(cls, value): """ Emulates gst_value_serialize_structure. Accepts a GstStructure. Returns a str type. """ if not isinstance(value, GstStructure): wrong_type_for_arg(value, "GstStructure", "value") return cls._wrap_string(str(value)) @classmethod def deserialize_structure(cls, read): """ Emulates gst_value_serialize_structure. Accepts a str type. Returns a GstStructure. """ if type(read) is not str: wrong_type_for_arg(read, "str", "read") if read[0] == '"': # NOTE: since all GstStructure strings end with ';', we # don't ever expect the above to *not* be true, but the # GStreamer library allows for this case try: read = cls._unwrap_string(read) # NOTE: in the GStreamer library, serialized # GstStructure and GstCaps strings are sent to # _priv_gst_value_parse_string with unescape set to # TRUE. What this essentially does is replace "\x" with # just "x". Since caps and structure strings should only # contain printable ascii characters before they are # passed to _wrap_string, this should be equivalent to # calling _unwrap_string. Our method is more clearly a # reverse of the serialization method. except DeserializeError as err: raise DeserializeError( read, "could not be unwrapped as a string ({!s})" "".format(err)) return GstStructure.new_from_str(read) @classmethod def serialize_caps(cls, value): """ Emulates gst_value_serialize_caps. Accepts a GstCaps. Returns a str type. """ if not isinstance(value, GstCaps): wrong_type_for_arg(value, "GstCaps", "value") return cls._wrap_string(str(value)) @classmethod def deserialize_caps(cls, read): """ Emulates gst_value_serialize_caps. Accepts a str type. Returns a GstCaps. """ if type(read) is not str: wrong_type_for_arg(read, "str", "read") if read[0] == '"': # can be not true if a caps only contains a single empty # structure, or is ALL or NONE try: read = cls._unwrap_string(read) except DeserializeError as err: raise DeserializeError( read, "could not be unwrapped as a string ({!s})" "".format(err)) return GstCaps.new_from_str(read) @staticmethod def _escape_string(read): """ Emulates some of g_strescape's behaviour in ges_marker_list_serialize """ # NOTE: in the original g_strescape, all the special characters # '\b', '\f', '\n', '\r', '\t', '\v', '\' and '"' are escaped, # and all characters in the range 0x01-0x1F and non-ascii # characters are replaced by an octal sequence # (similar to _wrap_string). # However, a caps string should only contain printable ascii # characters, so it should be sufficient to simply escape '\' # and '"'. escaped = ['"'] for character in read: if character in ('"', '\\'): escaped.append('\\') escaped.append(character) escaped.append('"') return "".join(escaped) @staticmethod def _unescape_string(read): """ Emulates behaviour of _priv_gst_value_parse_string with unescape set to TRUE. This should undo _escape_string """ if read[0] != '"': return read character_iter = iter(read) def next_char(): try: return next(character_iter) except StopIteration: raise DeserializeError(read, "ends unexpectedly") next_char() # skip '"' unescaped = [] while True: character = next_char() if character == '"': break if character == '\\': unescaped.append(next_char()) else: unescaped.append(character) return "".join(unescaped) class GstCapsFeatures(): """ Mimicking a GstCapsFeatures. """ def __init__(self, *features): """ Initialize the GstCapsFeatures. 'features' should be a series of feature names as strings. """ self.is_any = False self.features = [] for feature in features: if type(feature) is not str: wrong_type_for_arg(feature, "strs", "features") self._check_feature(feature) self.features.append(feature) # NOTE: if 'features' is a str, rather than a list of strs # then this will iterate through all of its characters! But, # a single character can not match the feature regular # expression. def __getitem__(self, index): return self.features[index] def __len__(self): return len(self.features) @classmethod def new_any(cls): features = cls() features.is_any = True return features # Based on gst_caps_feature_name_is_valid FEATURE_FORMAT = r"(?P[a-zA-Z]*:[a-zA-Z][a-zA-Z0-9]*)" FEATURE_REGEX = re.compile(FEATURE_FORMAT) @classmethod def _check_feature(cls, feature): if not cls.FEATURE_REGEX.fullmatch(feature): raise InvalidValueError( "feature", feature, "to match the regular expression " "{}".format(cls.FEATURE_REGEX.pattern)) PARSE_FEATURE_REGEX = re.compile( r" *" + FEATURE_FORMAT + "(?P)") @classmethod def new_from_str(cls, read): """ Returns a new instance of GstCapsFeatures, based on the Gst library function gst_caps_features_from_string. Strings obtained from the GstCapsFeatures str() method can be parsed in to recreate the original GstCapsFeatures. """ if type(read) is not str: wrong_type_for_arg(read, "str", "read") if read == "ANY": return cls.new_any() first = True features = [] while read: if first: first = False else: if read[0] != ',': DeserializeError( read, "does not separate features with commas") read = read[1:] match = cls.PARSE_FEATURE_REGEX.match(read) if match is None: raise DeserializeError( read, "does not match the regular expression {}" "".format(cls.PARSE_FEATURE_REGEX.pattern)) features.append(match.group("feature")) read = read[match.end("end"):] return cls(*features) def __repr__(self): if self.is_any: return "GstCapsFeatures.new_any()" write = ["GstCapsFeatures("] first = True for feature in self.features: if first: first = False else: write.append(", ") write.append(repr(feature)) write.append(")") return "".join(write) def __str__(self): """Emulate gst_caps_features_to_string""" if not self.features and self.is_any: return "ANY" write = [] first = True for feature in self.features: if type(feature) is not str: raise TypeError( "Found a feature that is not a str type") if first: first = False else: write.append(", ") write.append(feature) return "".join(write) class GstCaps: GST_CAPS_FLAG_ANY = 1 << 4 # from GST_MINI_OBJECT_FLAG_LAST def __init__(self, *structs): """ Initialize the GstCaps. 'structs' should be a series of GstStructures, and GstCapsFeatures pairs: struct0, features0, struct1, features1, ... None may be given in place of a GstCapsFeatures, in which case an empty features is assigned to the structure. Note, this instance will need to take ownership of any given GstStructure or GstCapsFeatures. """ if len(structs) % 2: raise InvalidValueError( "*structs", structs, "an even number of arguments") self.flags = 0 self.structs = [] struct = None for index, arg in enumerate(structs): if index % 2 == 0: struct = arg else: self.append(struct, arg) def get_structure(self, index): """Return the GstStructure at the given index""" return self.structs[index][0] def get_features(self, index): """Return the GstStructure at the given index""" return self.structs[index][1] def __getitem__(self, index): return self.get_structure(index) def __len__(self): return len(self.structs) def __iter__(self): for s in self.structs: yield s @classmethod def new_any(cls): caps = cls() caps.flags = cls.GST_CAPS_FLAG_ANY return caps def is_any(self): return self.flags & self.GST_CAPS_FLAG_ANY != 0 FEATURES_FORMAT = r"\((?P[^)]*)\)" NAME_FEATURES_REGEX = re.compile( GstStructure.ASCII_SPACES + GstStructure.NAME_FORMAT + r"(" + FEATURES_FORMAT + r")?" + GstStructure.END_FORMAT) @classmethod def new_from_str(cls, read): """ Returns a new instance of GstCaps, based on the Gst library function gst_caps_from_string. Strings obtained from the GstCaps str() method can be parsed in to recreate the original GstCaps. """ if type(read) is not str: wrong_type_for_arg(read, "str", "read") if read == "ANY": return cls.new_any() if read in ("EMPTY", "NONE"): return cls() structs = [] # restriction-caps is otherwise serialized in the format: # "struct-name-nums(feature), " # "field1=(type1)val1, field2=(type2)val2; " # "struct-name-alphas(feature), " # "fieldA=(typeA)valA, fieldB=(typeB)valB" # Note the lack of ';' for the last structure, and the # '(feature)' is optional. # # NOTE: gst_caps_from_string also accepts: # "struct-name(feature" # without the final ')', but this must be the end of the string, # but we will require that this final ')' is still given while read: match = cls.NAME_FEATURES_REGEX.match(read) if match is None: raise DeserializeError( read, "does not match the regular expression {}" "".format(cls.NAME_FEATURE_REGEX.pattern)) read = read[match.end("end"):] name = match.group("name") features = match.group("features") # NOTE: features may be None since the features part of the # regular expression is optional if features is None: features = GstCapsFeatures() else: features = GstCapsFeatures.new_from_str(features) fields, read = GstStructure._parse_fields(read) structs.append(GstStructure(name, fields)) structs.append(features) return cls(*structs) def __repr__(self): if self.is_any(): return "GstCaps.new_any()" write = ["GstCaps("] first = True for struct in self.structs: if first: first = False else: write.append(", ") write.append(repr(struct[0])) write.append(", ") write.append(repr(struct[1])) write.append(")") return "".join(write) def __str__(self): """Emulate gst_caps_to_string""" if self.is_any(): return "ANY" if not self.structs: return "EMPTY" first = True write = [] for struct, features in self.structs: if first: first = False else: write.append("; ") write.append(struct._name_to_str()) if features.is_any or features.features: # NOTE: is gst_caps_to_string, the feature will not # be written if it only contains the # GST_FEATURE_MEMORY_SYSTEM_MEMORY feature, since this # considered equal to being an empty features. # We do not seem to require this behaviour write.append("({!s})".format(features)) write.append(struct._fields_to_str()) return "".join(write) def append(self, structure, features=None): """Append a structure with the given features""" if not isinstance(structure, GstStructure): wrong_type_for_arg(structure, "GstStructure", "structure") if features is None: features = GstCapsFeatures() if not isinstance(features, GstCapsFeatures): wrong_type_for_arg( features, "GstCapsFeatures or None", "features") self.structs.append((structure, features))