diff --git a/validate/launcher/apps/gstvalidate.py b/validate/launcher/apps/gstvalidate.py index 7242dddda8..42def98195 100644 --- a/validate/launcher/apps/gstvalidate.py +++ b/validate/launcher/apps/gstvalidate.py @@ -27,7 +27,9 @@ import socket import subprocess import configparser import json -from launcher.loggable import Loggable +import glob +import math +from launcher.loggable import Loggable, error from launcher.baseclasses import GstValidateTest, Test, \ ScenarioManager, NamedDic, GstValidateTestsGenerator, \ @@ -36,7 +38,8 @@ from launcher.baseclasses import GstValidateTest, Test, \ from launcher.utils import path2url, url2path, DEFAULT_TIMEOUT, which, \ GST_SECOND, Result, Protocols, mkdir, printc, Colors, get_data_file, \ - kill_subprocess, format_config_template, get_fakesink_for_media_type + kill_subprocess, format_config_template, get_fakesink_for_media_type, \ + parse_gsttimeargs, GstCaps # # Private global variables # @@ -337,6 +340,8 @@ class GstValidatePipelineTestsGenerator(GstValidateTestsGenerator): scenarios_to_iterate = scenarios config_files = extra_data.get('config_files') + mediainfo = extra_data.get( + 'media_info', FakeMediaDescriptor(extra_data, pipeline)) for scenario in scenarios_to_iterate: if isinstance(scenario, str): tmpscenario = self.test_manager.scenarios_manager.get_scenario( @@ -345,7 +350,6 @@ class GstValidatePipelineTestsGenerator(GstValidateTestsGenerator): raise RuntimeError("Could not find scenario file: %s" % scenario) scenario = tmpscenario - mediainfo = FakeMediaDescriptor(extra_data, pipeline) if not mediainfo.is_compatible(scenario): continue @@ -470,6 +474,98 @@ class GstValidatePlaybinTestsGenerator(GstValidatePipelineTestsGenerator): rtsp2=True)) +class GstValidateCheckAccurateSeekingTestGenerator(GstValidatePipelineTestsGenerator): + def __new__(cls, name, test_manager, media_infos, extra_data=None): + pipelines = {} + + for path, reference_frame_dir in media_infos: + media_info = GstValidateMediaDescriptor(path) + media_info.set_protocol("file") + if not media_info: + error("GstValidateCheckAccurateSeekingTestGenerator", + "Could not create a media info file from %s" % path) + continue + + if media_info.is_image(): + error("GstValidateCheckAccurateSeekingTestGenerator", + "%s is an image, can't run accurate seeking tests" % path) + continue + + if media_info.get_num_tracks("video") < 1: + error("GstValidateCheckAccurateSeekingTestGenerator", + "%s is not a video, can't run accurate seeking tests" % path) + continue + + if media_info.get_num_tracks("video") < 1: + error("GstValidateCheckAccurateSeekingTestGenerator", + "No video track, can't run accurate seeking tests" % path) + continue + + if test_manager.options.validate_generate_ssim_reference_files: + scenario = None + test_name = media_info.get_clean_name() + '.generate_reference_files' + config = [ + 'validatessim, element-name="videoconvert", output-dir="%s"' % reference_frame_dir] + else: + test_name = media_info.get_clean_name() + framerate, scenario = cls.generate_scenario(test_manager.options, reference_frame_dir, media_info) + if scenario is None: + error("GstValidateCheckAccurateSeekingTestGenerator", + "Could not generate test for media info: %s" % path) + continue + + config = [ + '%(ssim)s, element-name="videoconvert", reference-images-dir="' + \ + reference_frame_dir + '", framerate=%d/%d' % (framerate.numerator, framerate.denominator) + ] + + + pipelines[test_name] = { + "pipeline": "uridecodebin uri=" + media_info.get_uri() + " ! deinterlace ! video/x-raw,interlace-mode=progressive ! videoconvert name=videoconvert ! %(videosink)s", + "media_info": media_info, + "config": config, + } + + if scenario: + pipelines[test_name]["scenarios"] = [scenario] + + return GstValidatePipelineTestsGenerator.from_dict(test_manager, name, pipelines, extra_data=extra_data) + + @classmethod + def generate_scenario(cls, options, reference_frame_dir, media_info): + actions = [ + "description, seek=true, handles-states=true, needs_preroll=true", + "pause", + ] + + framerate = None + for track_type, caps in media_info.get_tracks_caps(): + if track_type == 'video': + for struct, _ in GstCaps.new_from_str(caps): + framerate = struct["framerate"] + if framerate: + break + assert framerate + + n_frames = int((media_info.get_duration() * framerate.numerator) / (GST_SECOND * framerate.denominator)) + frames_timestamps = [math.ceil(i * framerate.denominator * GST_SECOND / framerate.numerator) for i in range(n_frames)] + # Ensure tests are not longer than long_limit, empirically considering we take 0.2 secs per frames. + acceptable_n_frames = options.long_limit * 5 + if n_frames > acceptable_n_frames: + n_frames_per_groups = int(acceptable_n_frames / 3) + frames_timestamps = frames_timestamps[0:n_frames_per_groups] \ + + frames_timestamps[int(n_frames / 2 - int(n_frames_per_groups / 2)):int(n_frames / 2 + int(n_frames_per_groups / 2))] \ + + frames_timestamps[-n_frames_per_groups:n_frames] + + actions += ['seek, flags=flush+accurate, start=(guint64)%s' % ts for ts in frames_timestamps] + actions += ['stop'] + + return framerate, { + "name": "check_accurate_seek", + "actions": actions, + } + + class GstValidateMixerTestsGenerator(GstValidatePipelineTestsGenerator): def __init__(self, name, test_manager, mixer, media_type, converter="", @@ -837,6 +933,7 @@ class GstValidateTestManager(GstValidateBaseTestManager): GstValidatePipelineTestsGenerator = GstValidatePipelineTestsGenerator GstValidatePlaybinTestsGenerator = GstValidatePlaybinTestsGenerator GstValidateMixerTestsGenerator = GstValidateMixerTestsGenerator + GstValidateCheckAccurateSeekingTestGenerator = GstValidateCheckAccurateSeekingTestGenerator GstValidateLaunchTest = GstValidateLaunchTest GstValidateMediaCheckTest = GstValidateMediaCheckTest GstValidateTranscodingTest = GstValidateTranscodingTest @@ -875,6 +972,9 @@ not been tested and explicitely activated if you set use --wanted-tests ALL""") group.add_argument("--validate-enable-iqa-tests", dest="validate_enable_iqa_tests", help="Enable Image Quality Assessment validation tests.", default=False, action='store_true') + group.add_argument("--validate-generate-ssim-reference-files", + help="(re)generate ssim reference image files.", + default=False, action='store_true') def print_valgrind_bugs(self): # Look for all the 'pending' bugs in our supp file @@ -972,10 +1072,8 @@ not been tested and explicitely activated if you set use --wanted-tests ALL""") if is_push or is_skipped: if not os.path.exists(media_info): continue - if is_push: uri = "push" + uri - args = GstValidateBaseTestManager.MEDIA_CHECK_COMMAND.split(" ") args.append(uri) if os.path.isfile(media_info) and not self.options.update_media_info and not is_skipped: @@ -1076,7 +1174,7 @@ not been tested and explicitely activated if you set use --wanted-tests ALL""") except ValueError: pass - if options.validate_uris: + if options.validate_uris or options.validate_generate_ssim_reference_files: self.check_testslist = False super(GstValidateTestManager, self).set_settings( diff --git a/validate/launcher/baseclasses.py b/validate/launcher/baseclasses.py index a3b8ce469e..c356a5aef2 100644 --- a/validate/launcher/baseclasses.py +++ b/validate/launcher/baseclasses.py @@ -2537,7 +2537,6 @@ class GstValidateMediaDescriptor(MediaDescriptor): for stream in streams: self._track_caps.append( (stream.attrib["type"], stream.attrib["caps"])) - self._uri = media_xml.attrib["uri"] self._skip_parsers = bool(int(media_xml.attrib.get('skip-parsers', 0))) self._has_frames = bool(int(media_xml.attrib["frame-detection"])) self._duration = int(media_xml.attrib["duration"]) @@ -2682,7 +2681,8 @@ class GstValidateMediaDescriptor(MediaDescriptor): def get_clean_name(self): name = os.path.basename(self.get_path()) - name = re.sub("\.stream_info|\.media_info", "", name) + regex = '|'.join(['\\.%s$' % ext for ext in [self.SKIPPED_MEDIA_INFO_EXT, self.MEDIA_INFO_EXT, self.PUSH_MEDIA_INFO_EXT, self.STREAM_INFO_EXT]]) + name = re.sub(regex, "", name) return name.replace('.', "_") diff --git a/validate/launcher/utils.py b/validate/launcher/utils.py index f6c0dedab9..1d89ee4cbe 100644 --- a/validate/launcher/utils.py +++ b/validate/launcher/utils.py @@ -24,6 +24,7 @@ except ImportError: from . import config import json +import numbers import os import platform import re @@ -38,10 +39,11 @@ import urllib.request import urllib.error import urllib.parse -from .loggable import Loggable +from .loggable import Loggable, warning from operator import itemgetter from xml.etree import ElementTree from collections import defaultdict +from fractions import Fraction GST_SECOND = int(1000000000) @@ -661,3 +663,1163 @@ def get_fakesink_for_media_type(media_type, needs_clock=False): return "fakesink sync=true" return "fakesink" + + +class InvalidValueError(ValueError): + """Received value is invalid""" + def __init__(self, name, value, expect): + ValueError.__init__( + self, "Invalid value {!r} for {}. Expect {}.".format( + value, name, expect)) + + +def wrong_type_for_arg(val, expect_type_name, arg_name): + """Raise exception in response to a wrong argument type""" + raise TypeError( + "Expect a {} type for the '{}' argument. Received a {} type." + "".format(expect_type_name, arg_name, type(val).__name__)) + + +class DeserializeError(Exception): + """Receive an incorrectly serialized value""" + MAX_LEN = 20 + + def __init__(self, read, reason): + if len(read) > self.MAX_LEN: + read = read[:self.MAX_LEN] + "..." + Exception.__init__( + self, "Could not deserialize the string ({}) because it {}." + "".format(read, reason)) + + +class GstStructure(Loggable): + """ + This implementation has been copied from OpenTimelineIO. + + Note that the types are to correspond to GStreamer/GES GTypes, + rather than python types. + + Current supported GTypes: + GType Associated Accepted + Python type aliases + ====================================== + gint int int, i + glong int + gint64 int + guint int uint, u + gulong int + guint64 int + gfloat float float, f + gdouble float double, d + gboolean bool boolean, + bool, b + string str or None str, s + GstFraction str or fraction + Fraction + GstStructure GstStructure structure + schema + GstCaps GstCaps + schema + + Note that other types can be given: these must be given as strings + and the user will be responsible for making sure they are already in + a serialized form. + """ + + INT_TYPES = ("int", "glong", "gint64") + UINT_TYPES = ("uint", "gulong", "guint64") + FLOAT_TYPES = ("float", "double") + BOOLEAN_TYPE = "boolean" + FRACTION_TYPE = "fraction" + STRING_TYPE = "string" + STRUCTURE_TYPE = "structure" + CAPS_TYPE = "GstCaps" + KNOWN_TYPES = INT_TYPES + UINT_TYPES + FLOAT_TYPES + ( + BOOLEAN_TYPE, FRACTION_TYPE, STRING_TYPE, STRUCTURE_TYPE, + CAPS_TYPE) + + TYPE_ALIAS = { + "i": "int", + "gint": "int", + "u": "uint", + "guint": "uint", + "f": "float", + "gfloat": "float", + "d": "double", + "gdouble": "double", + "b": BOOLEAN_TYPE, + "bool": BOOLEAN_TYPE, + "gboolean": BOOLEAN_TYPE, + "GstFraction": FRACTION_TYPE, + "str": STRING_TYPE, + "s": STRING_TYPE, + "GstStructure": STRUCTURE_TYPE + } + + def __init__(self, name=None, fields=None): + if name is None: + name = "Unnamed" + if fields is None: + fields = {} + if type(name) is not str: + wrong_type_for_arg(name, "str", "name") + self._check_name(name) + self.name = name + try: + fields = dict(fields) + except (TypeError, ValueError): + wrong_type_for_arg(fields, "dict", "fields") + self.fields = {} + for key in fields: + entry = fields[key] + if type(entry) is not tuple: + try: + entry = tuple(entry) + except (TypeError, ValueError): + raise TypeError( + "Expect dict to be filled with tuple-like " + "entries") + if len(entry) != 2: + raise TypeError( + "Expect dict to be filled with 2-entry tuples") + self.set(key, *entry) + + def __repr__(self): + return "GstStructure({!r}, {!r})".format(self.name, self.fields) + + UNKNOWN_PREFIX = "[UNKNOWN]" + + @classmethod + def _make_type_unknown(cls, _type): + return cls.UNKNOWN_PREFIX + _type + # note the sqaure brackets make the type break the TYPE_FORMAT + + @classmethod + def _is_unknown_type(cls, _type): + return _type[:len(cls.UNKNOWN_PREFIX)] == cls.UNKNOWN_PREFIX + + @classmethod + def _get_unknown_type(cls, _type): + return _type[len(cls.UNKNOWN_PREFIX):] + + def _field_to_str(self, key): + """Return field in a serialized form""" + _type, value = self.fields[key] + if type(key) is not str: + raise TypeError("Found a key that is not a str type") + if type(_type) is not str: + raise TypeError( + "Found a type name that is not a str type") + self._check_key(key) + _type = self.TYPE_ALIAS.get(_type, _type) + if self._is_unknown_type(_type): + _type = self._get_unknown_type(_type) + self._check_type(_type) + self._check_unknown_typed_value(value) + # already in serialized form + else: + self._check_type(_type) + value = self.serialize_value(_type, value) + return "{}=({}){}".format(key, _type, value) + + def _fields_to_str(self): + write = [] + for key in self.fields: + write.append(", {}".format(self._field_to_str(key))) + return "".join(write) + + def _name_to_str(self): + """Return the name in a serialized form""" + return self._check_name(self.name) + + def __str__(self): + """Emulates gst_structure_to_string""" + return "{}{};".format(self._name_to_str(), self._fields_to_str()) + + def get_type_name(self, key): + """Return the field type""" + _type = self.fields[key][0] + return _type + + def get_value(self, key): + """Return the field value""" + value = self.fields[key][1] + return value + + def __getitem__(self, key): + return self.get_value(key) + + def __len__(self): + return len(self.fields) + + @staticmethod + def _val_type_err(typ, val, expect): + raise TypeError( + "Received value ({!s}) is a {} rather than a {}, even " + "though the {} type was given".format( + val, type(val).__name__, expect, typ)) + + def set(self, key, _type, value): + """Set a field to the given typed value""" + if type(key) is not str: + wrong_type_for_arg(key, "str", "key") + if type(_type) is not str: + wrong_type_for_arg(_type, "str", "_type") + _type = self.TYPE_ALIAS.get(_type, _type) + if self.fields.get(key) == (_type, value): + return + self._check_key(key) + type_is_unknown = True + if self._is_unknown_type(_type): + # this can happen if the user is setting a GstStructure + # using a preexisting GstStructure, the type will then + # be passed and marked as unknown + _type = self._get_unknown_type(_type) + self._check_type(_type) + else: + self._check_type(_type) + if _type in self.INT_TYPES: + type_is_unknown = False + if not isinstance(value, int): + self._val_type_err(_type, value, "int") + elif _type in self.UINT_TYPES: + type_is_unknown = False + if not isinstance(value, int): + self._val_type_err(_type, value, "int") + if value < 0: + raise InvalidValueError( + "value", value, "a positive integer for {} " + "types".format(_type)) + elif _type in self.FLOAT_TYPES: + type_is_unknown = False + if type(value) is not float: + self._val_type_err(_type, value, "float") + elif _type == self.BOOLEAN_TYPE: + type_is_unknown = False + if type(value) is not bool: + self._val_type_err(_type, value, "bool") + elif _type == self.FRACTION_TYPE: + type_is_unknown = False + if type(value) is Fraction: + value = value + elif type(value) is str: + try: + Fraction(value) + except ValueError: + raise InvalidValueError( + "value", value, "a fraction for the {} " + "types".format(_type)) + else: + self._val_type_err(_type, value, "Fraction or str") + elif _type == self.STRING_TYPE: + type_is_unknown = False + if value is not None and type(value) is not str: + self._val_type_err(_type, value, "str or None") + elif _type == self.STRUCTURE_TYPE: + type_is_unknown = False + if not isinstance(value, GstStructure): + self._val_type_err(_type, value, "GstStructure") + elif _type == self.CAPS_TYPE: + type_is_unknown = False + if not isinstance(value, GstCaps): + self._val_type_err(_type, value, "GstCaps") + if type_is_unknown: + self._check_unknown_typed_value(value) + warning('GstStructure', + "The GstStructure type {} with the value ({}) is " + "unknown. The value will be stored and serialized as " + "given.".format(_type, value)) + _type = self._make_type_unknown(_type) + self.fields[key] = (_type, value) + + def get(self, key, default=None): + """Return the raw value associated with key""" + if key in self.fields: + value = self.get_value(key) + return value + return default + + def get_typed(self, key, expect_type, default=None): + """ + Return the raw value associated with key if its type matches. + Raises a warning if a value exists under key but is of the + wrong type. + """ + if type(expect_type) is not str: + wrong_type_for_arg(expect_type, "str", "expect_type") + expect_type = self.TYPE_ALIAS.get(expect_type, expect_type) + if key in self.fields: + type_name = self.get_type_name(key) + if expect_type == type_name: + value = self.get_value(key) + return value + warning('GstStructure', + "The structure {} contains a value under {}, but is " + "a {}, rather than the expected {} type".format( + self.name, key, type_name, expect_type)) + return default + + def values(self): + """Return a list of all values contained in the structure""" + return [self.get_value(key) for key in self.fields] + + def values_of_type(self, _type): + """ + Return a list of all values contained of the given type in the + structure + """ + if type(_type) is not str: + wrong_type_for_arg(_type, "str", "_type") + _type = self.TYPE_ALIAS.get(_type, _type) + return [self.get_value(key) for key in self.fields + if self.get_type_name(key) == _type] + + ASCII_SPACES = r"(\\?[ \t\n\r\f\v])*" + END_FORMAT = r"(?P" + ASCII_SPACES + r")" + NAME_FORMAT = r"(?P[a-zA-Z][a-zA-Z0-9/_.:-]*)" + # ^Format requirement for the name of a GstStructure + SIMPLE_STRING = r"[a-zA-Z0-9_+/:.-]+" + # see GST_ASCII_CHARS (below) + KEY_FORMAT = r"(?P" + SIMPLE_STRING + r")" + # NOTE: GstStructure technically allows more general keys, but + # these can break the parsing. + TYPE_FORMAT = r"(?P" + SIMPLE_STRING + r")" + BASIC_VALUE_FORMAT = \ + r'(?P("(\\.|[^"])*")|(' + SIMPLE_STRING + r'))' + # consume simple string or a string between quotes. Second will + # consume anything that is escaped, including a '"' + # NOTE: \\. is used rather than \\" since: + # + '"start\"end;"' should be captured as '"start\"end"' since + # the '"' is escaped. + # + '"start\\"end;"' should be captured as '"start\\"' since the + # '\' is escaped, not the '"' + # In the fist case \\. will consume '\"', and in the second it will + # consumer '\\', as desired. The second would not work with just \\" + + @staticmethod + def _check_against_regex(check, regex, name): + if not regex.fullmatch(check): + raise InvalidValueError( + name, check, "to match the regular expression {}" + "".format(regex.pattern)) + + NAME_REGEX = re.compile(NAME_FORMAT) + KEY_REGEX = re.compile(KEY_FORMAT) + TYPE_REGEX = re.compile(TYPE_FORMAT) + + @classmethod + def _check_name(cls, name): + cls._check_against_regex(name, cls.NAME_REGEX, "name") + + @classmethod + def _check_key(cls, key): + cls._check_against_regex(key, cls.KEY_REGEX, "key") + + @classmethod + def _check_type(cls, _type): + cls._check_against_regex(_type, cls.TYPE_REGEX, "type") + + @classmethod + def _check_unknown_typed_value(cls, value): + if type(value) is not str: + cls._val_type_err("unknown", value, "string") + try: + # see if the value could be successfully parsed in again + ret_type, ret_val, _ = cls._parse_value(value, False) + except DeserializeError as err: + raise InvalidValueError( + "value", value, "unknown-typed values to be in a " + "serialized format ({!s})".format(err)) + else: + if ret_type is not None: + raise InvalidValueError( + "value", value, "unknown-typed values to *not* " + "start with a type specification, only the " + "serialized value should be given") + if ret_val != value: + raise InvalidValueError( + "value", value, "unknown-typed values to be the " + "same as its parsed value {}".format(ret_val)) + + PARSE_NAME_REGEX = re.compile( + ASCII_SPACES + NAME_FORMAT + END_FORMAT) + + @classmethod + def _parse_name(cls, read): + match = cls.PARSE_NAME_REGEX.match(read) + if match is None: + raise DeserializeError( + read, "does not start with a correct name") + name = match.group("name") + read = read[match.end("end"):] + return name, read + + @classmethod + def _parse_range_list_array(cls, read): + start = read[0] + end = {'[': ']', '{': '}', '<': '>'}.get(start) + read = read[1:] + values = [start, ' '] + first = True + while read and read[0] != end: + if first: + first = False + else: + if read and read[0] != ',': + DeserializeError( + read, "does not contain a comma between listed " + "items") + values.append(", ") + read = read[1:] + _type, value, read = cls._parse_value(read, False) + if _type is not None: + if cls._is_unknown_type(_type): + # remove unknown marker for serialization + _type = cls._get_unknown_type(_type) + values.extend(('(', _type, ')')) + values.append(value) + if not read: + raise DeserializeError( + read, "ended before {} could be found".format(end)) + read = read[1:] # skip past 'end' + match = cls.END_REGEX.match(read) # skip whitespace + read = read[match.end("end"):] + # NOTE: we are ignoring the incorrect cases where a range + # has 0, 1 or 4+ values! This is the users responsiblity. + values.extend((' ', end)) + return "".join(values), read + + FIELD_START_REGEX = re.compile( + ASCII_SPACES + KEY_FORMAT + ASCII_SPACES + r"=" + END_FORMAT) + FIELD_TYPE_REGEX = re.compile( + ASCII_SPACES + r"(\(" + ASCII_SPACES + TYPE_FORMAT + + ASCII_SPACES + r"\))?" + END_FORMAT) + FIELD_VALUE_REGEX = re.compile( + ASCII_SPACES + BASIC_VALUE_FORMAT + END_FORMAT) + END_REGEX = re.compile(END_FORMAT) + + @classmethod + def _parse_value(cls, read, deserialize=True): + match = cls.FIELD_TYPE_REGEX.match(read) + # match shouldn't be None since the (TYPE_FORMAT) is optional + # and the rest is just ASCII_SPACES + _type = match.group("type") + if _type is None and deserialize: + # if deserialize is False, the (type) is optional + raise DeserializeError( + read, "does not contain a valid '(type)' format") + _type = cls.TYPE_ALIAS.get(_type, _type) + type_is_unknown = True + read = read[match.end("end"):] + if read and read[0] in ('[', '{', '<'): + # range/list/array types + # this is an unknown type, even though _type itself may + # be known. e.g. a list on integers will have _type as 'int' + # but the corresponding value can not be deserialized as an + # integer + value, read = cls._parse_range_list_array(read) + if deserialize: + # prevent printing on subsequent calls if we find a + # list within a list, etc. + warning('GstStructure', + "GstStructure received a range/list/array of type " + "{}, which can not be deserialized. Storing the " + "value as {}.".format(_type, value)) + else: + match = cls.FIELD_VALUE_REGEX.match(read) + if match is None: + raise DeserializeError( + read, "does not have a valid value format") + read = read[match.end("end"):] + value = match.group("value") + if deserialize: + if _type in cls.KNOWN_TYPES: + type_is_unknown = False + try: + value = cls.deserialize_value(_type, value) + except DeserializeError as err: + raise DeserializeError( + read, "contains an invalid typed value " + "({!s})".format(err)) + else: + warning('GstStructure', + "GstStructure found a type {} that is unknown. " + "The corresponding value ({}) will not be " + "deserialized and will be stored as given." + "".format(_type, value)) + if type_is_unknown and _type is not None: + _type = cls._make_type_unknown(_type) + return _type, value, read + + @classmethod + def _parse_field(cls, read): + match = cls.FIELD_START_REGEX.match(read) + if match is None: + raise DeserializeError( + read, "does not have a valid 'key=...' format") + key = match.group("key") + read = read[match.end("end"):] + _type, value, read = cls._parse_value(read) + return key, _type, value, read + + @classmethod + def _parse_fields(cls, read): + if type(read) is not str: + wrong_type_for_arg(read, "str", "read") + fields = {} + while read and read[0] != ';': + if read and read[0] != ',': + DeserializeError( + read, "does not separate fields with commas") + read = read[1:] + key, _type, value, read = cls._parse_field(read) + fields[key] = (_type, value) + if read: + # read[0] == ';' + read = read[1:] + return fields, read + + @classmethod + def new_from_str(cls, read): + """ + Returns a new instance of GstStructure, based on the Gst library + function gst_structure_from_string. + Strings obtained from the GstStructure str() method can be + parsed in to recreate the original GstStructure. + """ + if type(read) is not str: + wrong_type_for_arg(read, "str", "read") + name, read = cls._parse_name(read) + fields = cls._parse_fields(read)[0] + return GstStructure(name=name, fields=fields) + + @staticmethod + def _val_read_err(typ, val): + raise DeserializeError( + val, "does not translated to the {} type".format(typ)) + + @classmethod + def deserialize_value(cls, _type, value): + """Return the value as the corresponding type""" + if type(_type) is not str: + wrong_type_for_arg(_type, "str", "_type") + if type(value) is not str: + wrong_type_for_arg(value, "str", "value") + _type = cls.TYPE_ALIAS.get(_type, _type) + if _type in cls.INT_TYPES or _type in cls.UINT_TYPES: + try: + value = int(value) + except ValueError: + cls._val_read_err(_type, value) + if _type in cls.UINT_TYPES and value < 0: + cls._val_read_err(_type, value) + elif _type in cls.FLOAT_TYPES: + try: + value = float(value) + except ValueError: + cls._val_read_err(_type, value) + elif _type == cls.BOOLEAN_TYPE: + try: + value = cls.deserialize_boolean(value) + except DeserializeError: + cls._val_read_err(_type, value) + elif _type == cls.FRACTION_TYPE: + try: + value = Fraction(value) + except ValueError: + cls._val_read_err(_type, value) + elif _type == cls.STRING_TYPE: + try: + value = cls.deserialize_string(value) + except DeserializeError as err: + raise DeserializeError( + value, "does not translate to a string ({!s})" + "".format(err)) + elif _type == cls.STRUCTURE_TYPE: + try: + value = cls.deserialize_structure(value) + except DeserializeError as err: + raise DeserializeError( + value, "does not translate to a GstStructure ({!s})" + "".format(err)) + elif _type == cls.CAPS_TYPE: + try: + value = cls.deserialize_caps(value) + except DeserializeError as err: + raise DeserializeError( + value, "does not translate to a GstCaps ({!s})" + "".format(err)) + else: + raise ValueError( + "The type {} is unknown, so the value ({}) can not " + "be deserialized.".format(_type, value)) + return value + + @classmethod + def serialize_value(cls, _type, value): + """Serialize the typed value as a string""" + if type(_type) is not str: + wrong_type_for_arg(_type, "str", "_type") + _type = cls.TYPE_ALIAS.get(_type, _type) + if _type in cls.INT_TYPES + cls.UINT_TYPES + cls.FLOAT_TYPES \ + + (cls.FRACTION_TYPE, ): + return str(value) + if _type == cls.BOOLEAN_TYPE: + return cls.serialize_boolean(value) + if _type == cls.STRING_TYPE: + return cls.serialize_string(value) + if _type == cls.STRUCTURE_TYPE: + return cls.serialize_structure(value) + if _type == cls.CAPS_TYPE: + return cls.serialize_caps(value) + raise ValueError( + "The type {} is unknown, so the value ({}) can not be " + "serialized.".format(_type, str(value))) + + # see GST_ASCII_IS_STRING in gst_private.h + GST_ASCII_CHARS = [ + ord(l) for l in "abcdefghijklmnopqrstuvwxyz" + "ABCDEFGHIJKLMNOPQRSTUVWXYZ" + "0123456789" + "_-+/:." + ] + LEADING_OCTAL_CHARS = [ord(l) for l in "0123"] + OCTAL_CHARS = [ord(l) for l in "01234567"] + + @classmethod + def serialize_string(cls, value): + """ + Emulates gst_value_serialize_string. + Accepts a bytes, str or None type. + Returns a str type. + """ + if value is not None and type(value) is not str: + wrong_type_for_arg(value, "None or str", "value") + return cls._wrap_string(value) + + @classmethod + def _wrap_string(cls, read): + if read is None: + return "NULL" + if read == "NULL": + return "\"NULL\"" + if type(read) is bytes: + pass + elif type(read) is str: + read = read.encode() + else: + wrong_type_for_arg(read, "None, str, or bytes", "read") + if not read: + return '""' + added_wrap = False + ser_string_list = [] + for byte in read: + if byte in cls.GST_ASCII_CHARS: + ser_string_list.append(chr(byte)) + elif byte < 0x20 or byte >= 0x7f: + ser_string_list.append("\\{:03o}".format(byte)) + added_wrap = True + else: + ser_string_list.append("\\" + chr(byte)) + added_wrap = True + if added_wrap: + ser_string_list.insert(0, '"') + ser_string_list.append('"') + return "".join(ser_string_list) + + @classmethod + def deserialize_string(cls, read): + """ + Emulates gst_value_deserialize_string. + Accepts a str type. + Returns a str or None type. + """ + if type(read) is not str: + wrong_type_for_arg(read, "str", "read") + if read == "NULL": + return None + if not read: + return "" + if read[0] != '"' or read[-1] != '"': + return read + return cls._unwrap_string(read) + + @classmethod + def _unwrap_string(cls, read): + """Emulates gst_string_unwrap""" + if type(read) is not bytes: + read_array = read.encode() + byte_list = [] + bytes_iter = iter(read_array) + + def next_byte(): + try: + return next(bytes_iter) + except StopIteration: + raise DeserializeError(read, "end unexpectedly") + + byte = next_byte() + if byte != ord('"'): + raise DeserializeError( + read, "does not start with '\"', but ends with '\"'") + while True: + byte = next_byte() + if byte in cls.GST_ASCII_CHARS: + byte_list.append(byte) + elif byte == ord('"'): + try: + next(bytes_iter) + except StopIteration: + # expect there to be no more bytes + break + raise DeserializeError( + read, "contains an un-escaped '\"' before the end") + elif byte == ord('\\'): + byte = next_byte() + if byte in cls.LEADING_OCTAL_CHARS: + # could be the start of an octal + byte2 = next_byte() + byte3 = next_byte() + if byte2 in cls.OCTAL_CHARS and byte3 in cls.OCTAL_CHARS: + nums = [b - ord('0') for b in (byte, byte2, byte3)] + byte = (nums[0] << 6) + (nums[1] << 3) + nums[2] + byte_list.append(byte) + else: + raise DeserializeError( + read, "contains the start of an octal " + "sequence but not the end") + else: + if byte == 0: + raise DeserializeError( + read, "contains a null byte after an escape") + byte_list.append(byte) + else: + raise DeserializeError( + read, "contains an unexpected un-escaped character") + out_str = bytes(bytearray(byte_list)) + try: + return out_str.decode() + except (UnicodeError, ValueError): + raise DeserializeError( + read, "contains invalid utf-8 byte sequences") + + @staticmethod + def serialize_boolean(value): + """ + Emulates gst_value_serialize_boolean. + Accepts bool type. + Returns a str type. + """ + if type(value) is not bool: + wrong_type_for_arg(value, "bool", "value") + if value: + return "true" + return "false" + + @staticmethod + def deserialize_boolean(read): + """ + Emulates gst_value_deserialize_boolean. + Accepts str type. + Returns a bool type. + """ + if type(read) is not str: + wrong_type_for_arg(read, "str", "read") + if read.lower() in ("true", "t", "yes", "1"): + return True + if read.lower() in ("false", "f", "no", "0"): + return False + raise DeserializeError(read, "is an unknown boolean value") + + @classmethod + def serialize_structure(cls, value): + """ + Emulates gst_value_serialize_structure. + Accepts a GstStructure. + Returns a str type. + """ + if not isinstance(value, GstStructure): + wrong_type_for_arg(value, "GstStructure", "value") + return cls._wrap_string(str(value)) + + @classmethod + def deserialize_structure(cls, read): + """ + Emulates gst_value_serialize_structure. + Accepts a str type. + Returns a GstStructure. + """ + if type(read) is not str: + wrong_type_for_arg(read, "str", "read") + if read[0] == '"': + # NOTE: since all GstStructure strings end with ';', we + # don't ever expect the above to *not* be true, but the + # GStreamer library allows for this case + try: + read = cls._unwrap_string(read) + # NOTE: in the GStreamer library, serialized + # GstStructure and GstCaps strings are sent to + # _priv_gst_value_parse_string with unescape set to + # TRUE. What this essentially does is replace "\x" with + # just "x". Since caps and structure strings should only + # contain printable ascii characters before they are + # passed to _wrap_string, this should be equivalent to + # calling _unwrap_string. Our method is more clearly a + # reverse of the serialization method. + except DeserializeError as err: + raise DeserializeError( + read, "could not be unwrapped as a string ({!s})" + "".format(err)) + return GstStructure.new_from_str(read) + + @classmethod + def serialize_caps(cls, value): + """ + Emulates gst_value_serialize_caps. + Accepts a GstCaps. + Returns a str type. + """ + if not isinstance(value, GstCaps): + wrong_type_for_arg(value, "GstCaps", "value") + return cls._wrap_string(str(value)) + + @classmethod + def deserialize_caps(cls, read): + """ + Emulates gst_value_serialize_caps. + Accepts a str type. + Returns a GstCaps. + """ + if type(read) is not str: + wrong_type_for_arg(read, "str", "read") + if read[0] == '"': + # can be not true if a caps only contains a single empty + # structure, or is ALL or NONE + try: + read = cls._unwrap_string(read) + except DeserializeError as err: + raise DeserializeError( + read, "could not be unwrapped as a string ({!s})" + "".format(err)) + return GstCaps.new_from_str(read) + + @staticmethod + def _escape_string(read): + """ + Emulates some of g_strescape's behaviour in + ges_marker_list_serialize + """ + # NOTE: in the original g_strescape, all the special characters + # '\b', '\f', '\n', '\r', '\t', '\v', '\' and '"' are escaped, + # and all characters in the range 0x01-0x1F and non-ascii + # characters are replaced by an octal sequence + # (similar to _wrap_string). + # However, a caps string should only contain printable ascii + # characters, so it should be sufficient to simply escape '\' + # and '"'. + escaped = ['"'] + for character in read: + if character in ('"', '\\'): + escaped.append('\\') + escaped.append(character) + escaped.append('"') + return "".join(escaped) + + @staticmethod + def _unescape_string(read): + """ + Emulates behaviour of _priv_gst_value_parse_string with + unescape set to TRUE. This should undo _escape_string + """ + if read[0] != '"': + return read + character_iter = iter(read) + + def next_char(): + try: + return next(character_iter) + except StopIteration: + raise DeserializeError(read, "ends unexpectedly") + + next_char() # skip '"' + unescaped = [] + while True: + character = next_char() + if character == '"': + break + if character == '\\': + unescaped.append(next_char()) + else: + unescaped.append(character) + return "".join(unescaped) + + +class GstCapsFeatures(): + """ + Mimicking a GstCapsFeatures. + """ + def __init__(self, *features): + """ + Initialize the GstCapsFeatures. + + 'features' should be a series of feature names as strings. + """ + self.is_any = False + self.features = [] + for feature in features: + if type(feature) is not str: + wrong_type_for_arg(feature, "strs", "features") + self._check_feature(feature) + self.features.append(feature) + # NOTE: if 'features' is a str, rather than a list of strs + # then this will iterate through all of its characters! But, + # a single character can not match the feature regular + # expression. + + def __getitem__(self, index): + return self.features[index] + + def __len__(self): + return len(self.features) + + @classmethod + def new_any(cls): + features = cls() + features.is_any = True + return features + + # Based on gst_caps_feature_name_is_valid + FEATURE_FORMAT = r"(?P[a-zA-Z]*:[a-zA-Z][a-zA-Z0-9]*)" + FEATURE_REGEX = re.compile(FEATURE_FORMAT) + + @classmethod + def _check_feature(cls, feature): + if not cls.FEATURE_REGEX.fullmatch(feature): + raise InvalidValueError( + "feature", feature, "to match the regular expression " + "{}".format(cls.FEATURE_REGEX.pattern)) + + PARSE_FEATURE_REGEX = re.compile( + r" *" + FEATURE_FORMAT + "(?P)") + + @classmethod + def new_from_str(cls, read): + """ + Returns a new instance of GstCapsFeatures, based on the Gst + library function gst_caps_features_from_string. + Strings obtained from the GstCapsFeatures str() method can be + parsed in to recreate the original GstCapsFeatures. + """ + if type(read) is not str: + wrong_type_for_arg(read, "str", "read") + if read == "ANY": + return cls.new_any() + first = True + features = [] + while read: + if first: + first = False + else: + if read[0] != ',': + DeserializeError( + read, "does not separate features with commas") + read = read[1:] + match = cls.PARSE_FEATURE_REGEX.match(read) + if match is None: + raise DeserializeError( + read, "does not match the regular expression {}" + "".format(cls.PARSE_FEATURE_REGEX.pattern)) + features.append(match.group("feature")) + read = read[match.end("end"):] + return cls(*features) + + def __repr__(self): + if self.is_any: + return "GstCapsFeatures.new_any()" + write = ["GstCapsFeatures("] + first = True + for feature in self.features: + if first: + first = False + else: + write.append(", ") + write.append(repr(feature)) + write.append(")") + return "".join(write) + + def __str__(self): + """Emulate gst_caps_features_to_string""" + if not self.features and self.is_any: + return "ANY" + write = [] + first = True + for feature in self.features: + if type(feature) is not str: + raise TypeError( + "Found a feature that is not a str type") + if first: + first = False + else: + write.append(", ") + write.append(feature) + return "".join(write) + + +class GstCaps: + GST_CAPS_FLAG_ANY = 1 << 4 + # from GST_MINI_OBJECT_FLAG_LAST + + def __init__(self, *structs): + """ + Initialize the GstCaps. + + 'structs' should be a series of GstStructures, and + GstCapsFeatures pairs: + struct0, features0, struct1, features1, ... + None may be given in place of a GstCapsFeatures, in which case + an empty features is assigned to the structure. + + Note, this instance will need to take ownership of any given + GstStructure or GstCapsFeatures. + """ + if len(structs) % 2: + raise InvalidValueError( + "*structs", structs, "an even number of arguments") + self.flags = 0 + self.structs = [] + struct = None + for index, arg in enumerate(structs): + if index % 2 == 0: + struct = arg + else: + self.append(struct, arg) + + def get_structure(self, index): + """Return the GstStructure at the given index""" + return self.structs[index][0] + + def get_features(self, index): + """Return the GstStructure at the given index""" + return self.structs[index][1] + + def __getitem__(self, index): + return self.get_structure(index) + + def __len__(self): + return len(self.structs) + + def __iter__(self): + for s in self.structs: + yield s + + @classmethod + def new_any(cls): + caps = cls() + caps.flags = cls.GST_CAPS_FLAG_ANY + return caps + + def is_any(self): + return self.flags & self.GST_CAPS_FLAG_ANY != 0 + + FEATURES_FORMAT = r"\((?P[^)]*)\)" + NAME_FEATURES_REGEX = re.compile( + GstStructure.ASCII_SPACES + GstStructure.NAME_FORMAT + + r"(" + FEATURES_FORMAT + r")?" + GstStructure.END_FORMAT) + + @classmethod + def new_from_str(cls, read): + """ + Returns a new instance of GstCaps, based on the Gst library + function gst_caps_from_string. + Strings obtained from the GstCaps str() method can be parsed in + to recreate the original GstCaps. + """ + if type(read) is not str: + wrong_type_for_arg(read, "str", "read") + if read == "ANY": + return cls.new_any() + if read in ("EMPTY", "NONE"): + return cls() + structs = [] + # restriction-caps is otherwise serialized in the format: + # "struct-name-nums(feature), " + # "field1=(type1)val1, field2=(type2)val2; " + # "struct-name-alphas(feature), " + # "fieldA=(typeA)valA, fieldB=(typeB)valB" + # Note the lack of ';' for the last structure, and the + # '(feature)' is optional. + # + # NOTE: gst_caps_from_string also accepts: + # "struct-name(feature" + # without the final ')', but this must be the end of the string, + # but we will require that this final ')' is still given + while read: + match = cls.NAME_FEATURES_REGEX.match(read) + if match is None: + raise DeserializeError( + read, "does not match the regular expression {}" + "".format(cls.NAME_FEATURE_REGEX.pattern)) + read = read[match.end("end"):] + name = match.group("name") + features = match.group("features") + # NOTE: features may be None since the features part of the + # regular expression is optional + if features is None: + features = GstCapsFeatures() + else: + features = GstCapsFeatures.new_from_str(features) + fields, read = GstStructure._parse_fields(read) + structs.append(GstStructure(name, fields)) + structs.append(features) + return cls(*structs) + + def __repr__(self): + if self.is_any(): + return "GstCaps.new_any()" + write = ["GstCaps("] + first = True + for struct in self.structs: + if first: + first = False + else: + write.append(", ") + write.append(repr(struct[0])) + write.append(", ") + write.append(repr(struct[1])) + write.append(")") + return "".join(write) + + def __str__(self): + """Emulate gst_caps_to_string""" + if self.is_any(): + return "ANY" + if not self.structs: + return "EMPTY" + first = True + write = [] + for struct, features in self.structs: + if first: + first = False + else: + write.append("; ") + write.append(struct._name_to_str()) + if features.is_any or features.features: + # NOTE: is gst_caps_to_string, the feature will not + # be written if it only contains the + # GST_FEATURE_MEMORY_SYSTEM_MEMORY feature, since this + # considered equal to being an empty features. + # We do not seem to require this behaviour + write.append("({!s})".format(features)) + write.append(struct._fields_to_str()) + return "".join(write) + + def append(self, structure, features=None): + """Append a structure with the given features""" + if not isinstance(structure, GstStructure): + wrong_type_for_arg(structure, "GstStructure", "structure") + if features is None: + features = GstCapsFeatures() + if not isinstance(features, GstCapsFeatures): + wrong_type_for_arg( + features, "GstCapsFeatures or None", "features") + self.structs.append((structure, features))