mirror of
https://github.com/searxng/searxng.git
synced 2025-01-10 10:25:25 +00:00
381 lines
12 KiB
Python
381 lines
12 KiB
Python
# SPDX-License-Identifier: AGPL-3.0-or-later
|
|
"""Configuration class :py:class:`Config` with deep-update, schema validation
|
|
and deprecated names.
|
|
|
|
The :py:class:`Config` class implements a configuration that is based on
|
|
structured dictionaries. The configuration schema is defined in a dictionary
|
|
structure and the configuration data is given in a dictionary structure.
|
|
"""
|
|
from __future__ import annotations
|
|
from typing import Any
|
|
|
|
import copy
|
|
import typing
|
|
import logging
|
|
import pathlib
|
|
|
|
from ..compat import tomllib
|
|
|
|
__all__ = ['Config', 'UNSET', 'SchemaIssue']
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
|
|
class FALSE:
|
|
"""Class of ``False`` singleton"""
|
|
|
|
# pylint: disable=multiple-statements
|
|
def __init__(self, msg):
|
|
self.msg = msg
|
|
|
|
def __bool__(self):
|
|
return False
|
|
|
|
def __str__(self):
|
|
return self.msg
|
|
|
|
__repr__ = __str__
|
|
|
|
|
|
UNSET = FALSE('<UNSET>')
|
|
|
|
|
|
class SchemaIssue(ValueError):
|
|
"""Exception to store and/or raise a message from a schema issue."""
|
|
|
|
def __init__(self, level: typing.Literal['warn', 'invalid'], msg: str):
|
|
self.level = level
|
|
super().__init__(msg)
|
|
|
|
def __str__(self):
|
|
return f"[cfg schema {self.level}] {self.args[0]}"
|
|
|
|
|
|
class Config:
|
|
"""Base class used for configuration"""
|
|
|
|
UNSET = UNSET
|
|
|
|
@classmethod
|
|
def from_toml(cls, schema_file: pathlib.Path, cfg_file: pathlib.Path, deprecated: dict) -> Config:
|
|
|
|
# init schema
|
|
|
|
log.debug("load schema file: %s", schema_file)
|
|
cfg = cls(cfg_schema=toml_load(schema_file), deprecated=deprecated)
|
|
if not cfg_file.exists():
|
|
log.warning("missing config file: %s", cfg_file)
|
|
return cfg
|
|
|
|
# load configuration
|
|
|
|
log.debug("load config file: %s", cfg_file)
|
|
upd_cfg = toml_load(cfg_file)
|
|
|
|
is_valid, issue_list = cfg.validate(upd_cfg)
|
|
for msg in issue_list:
|
|
log.error(str(msg))
|
|
if not is_valid:
|
|
raise TypeError(f"schema of {cfg_file} is invalid!")
|
|
cfg.update(upd_cfg)
|
|
return cfg
|
|
|
|
def __init__(self, cfg_schema: typing.Dict, deprecated: typing.Dict[str, str]):
|
|
"""Constructor of class Config.
|
|
|
|
:param cfg_schema: Schema of the configuration
|
|
:param deprecated: dictionary that maps deprecated configuration names to a messages
|
|
|
|
These values are needed for validation, see :py:obj:`validate`.
|
|
|
|
"""
|
|
self.cfg_schema = cfg_schema
|
|
self.deprecated = deprecated
|
|
self.cfg = copy.deepcopy(cfg_schema)
|
|
|
|
def __getitem__(self, key: str) -> Any:
|
|
return self.get(key)
|
|
|
|
def validate(self, cfg: dict):
|
|
"""Validation of dictionary ``cfg`` on :py:obj:`Config.SCHEMA`.
|
|
Validation is done by :py:obj:`validate`."""
|
|
|
|
return validate(self.cfg_schema, cfg, self.deprecated)
|
|
|
|
def update(self, upd_cfg: dict):
|
|
"""Update this configuration by ``upd_cfg``."""
|
|
|
|
dict_deepupdate(self.cfg, upd_cfg)
|
|
|
|
def default(self, name: str):
|
|
"""Returns default value of field ``name`` in ``self.cfg_schema``."""
|
|
return value(name, self.cfg_schema)
|
|
|
|
def get(self, name: str, default: Any = UNSET, replace: bool = True) -> Any:
|
|
"""Returns the value to which ``name`` points in the configuration.
|
|
|
|
If there is no such ``name`` in the config and the ``default`` is
|
|
:py:obj:`UNSET`, a :py:obj:`KeyError` is raised.
|
|
"""
|
|
|
|
parent = self._get_parent_dict(name)
|
|
val = parent.get(name.split('.')[-1], UNSET)
|
|
if val is UNSET:
|
|
if default is UNSET:
|
|
raise KeyError(name)
|
|
val = default
|
|
|
|
if replace and isinstance(val, str):
|
|
val = val % self
|
|
return val
|
|
|
|
def set(self, name: str, val):
|
|
"""Set the value to which ``name`` points in the configuration.
|
|
|
|
If there is no such ``name`` in the config, a :py:obj:`KeyError` is
|
|
raised.
|
|
"""
|
|
parent = self._get_parent_dict(name)
|
|
parent[name.split('.')[-1]] = val
|
|
|
|
def _get_parent_dict(self, name):
|
|
parent_name = '.'.join(name.split('.')[:-1])
|
|
if parent_name:
|
|
parent = value(parent_name, self.cfg)
|
|
else:
|
|
parent = self.cfg
|
|
if (parent is UNSET) or (not isinstance(parent, dict)):
|
|
raise KeyError(parent_name)
|
|
return parent
|
|
|
|
def path(self, name: str, default=UNSET):
|
|
"""Get a :py:class:`pathlib.Path` object from a config string."""
|
|
|
|
val = self.get(name, default)
|
|
if val is UNSET:
|
|
if default is UNSET:
|
|
raise KeyError(name)
|
|
return default
|
|
return pathlib.Path(str(val))
|
|
|
|
def pyobj(self, name, default=UNSET):
|
|
"""Get python object referred by full qualiffied name (FQN) in the config
|
|
string."""
|
|
|
|
fqn = self.get(name, default)
|
|
if fqn is UNSET:
|
|
if default is UNSET:
|
|
raise KeyError(name)
|
|
return default
|
|
(modulename, name) = str(fqn).rsplit('.', 1)
|
|
m = __import__(modulename, {}, {}, [name], 0)
|
|
return getattr(m, name)
|
|
|
|
|
|
def toml_load(file_name):
|
|
try:
|
|
with open(file_name, "rb") as f:
|
|
return tomllib.load(f)
|
|
except tomllib.TOMLDecodeError as exc:
|
|
msg = str(exc).replace('\t', '').replace('\n', ' ')
|
|
log.error("%s: %s", file_name, msg)
|
|
raise
|
|
|
|
|
|
# working with dictionaries
|
|
|
|
|
|
def value(name: str, data_dict: dict):
|
|
"""Returns the value to which ``name`` points in the ``dat_dict``.
|
|
|
|
.. code: python
|
|
|
|
>>> data_dict = {
|
|
"foo": {"bar": 1 },
|
|
"bar": {"foo": 2 },
|
|
"foobar": [1, 2, 3],
|
|
}
|
|
>>> value('foobar', data_dict)
|
|
[1, 2, 3]
|
|
>>> value('foo.bar', data_dict)
|
|
1
|
|
>>> value('foo.bar.xxx', data_dict)
|
|
<UNSET>
|
|
|
|
"""
|
|
|
|
ret_val = data_dict
|
|
for part in name.split('.'):
|
|
if isinstance(ret_val, dict):
|
|
ret_val = ret_val.get(part, UNSET)
|
|
if ret_val is UNSET:
|
|
break
|
|
return ret_val
|
|
|
|
|
|
def validate(
|
|
schema_dict: typing.Dict, data_dict: typing.Dict, deprecated: typing.Dict[str, str]
|
|
) -> typing.Tuple[bool, list]:
|
|
"""Deep validation of dictionary in ``data_dict`` against dictionary in
|
|
``schema_dict``. Argument deprecated is a dictionary that maps deprecated
|
|
configuration names to a messages::
|
|
|
|
deprecated = {
|
|
"foo.bar" : "config 'foo.bar' is deprecated, use 'bar.foo'",
|
|
"..." : "..."
|
|
}
|
|
|
|
The function returns a python tuple ``(is_valid, issue_list)``:
|
|
|
|
``is_valid``:
|
|
A bool value indicating ``data_dict`` is valid or not.
|
|
|
|
``issue_list``:
|
|
A list of messages (:py:obj:`SchemaIssue`) from the validation::
|
|
|
|
[schema warn] data_dict: deprecated 'fontlib.foo': <DEPRECATED['foo.bar']>
|
|
[schema invalid] data_dict: key unknown 'fontlib.foo'
|
|
[schema invalid] data_dict: type mismatch 'fontlib.foo': expected ..., is ...
|
|
|
|
If ``schema_dict`` or ``data_dict`` is not a dictionary type a
|
|
:py:obj:`SchemaIssue` is raised.
|
|
|
|
"""
|
|
names = []
|
|
is_valid = True
|
|
issue_list = []
|
|
|
|
if not isinstance(schema_dict, dict):
|
|
raise SchemaIssue('invalid', "schema_dict is not a dict type")
|
|
if not isinstance(data_dict, dict):
|
|
raise SchemaIssue('invalid', f"data_dict issue{'.'.join(names)} is not a dict type")
|
|
|
|
is_valid, issue_list = _validate(names, issue_list, schema_dict, data_dict, deprecated)
|
|
return is_valid, issue_list
|
|
|
|
|
|
def _validate(
|
|
names: typing.List,
|
|
issue_list: typing.List,
|
|
schema_dict: typing.Dict,
|
|
data_dict: typing.Dict,
|
|
deprecated: typing.Dict[str, str],
|
|
) -> typing.Tuple[bool, typing.List]:
|
|
|
|
is_valid = True
|
|
|
|
for key, data_value in data_dict.items():
|
|
|
|
names.append(key)
|
|
name = '.'.join(names)
|
|
|
|
deprecated_msg = deprecated.get(name)
|
|
# print("XXX %s: key %s // data_value: %s" % (name, key, data_value))
|
|
if deprecated_msg:
|
|
issue_list.append(SchemaIssue('warn', f"data_dict '{name}': deprecated - {deprecated_msg}"))
|
|
|
|
schema_value = value(name, schema_dict)
|
|
# print("YYY %s: key %s // schema_value: %s" % (name, key, schema_value))
|
|
if schema_value is UNSET:
|
|
if not deprecated_msg:
|
|
issue_list.append(SchemaIssue('invalid', f"data_dict '{name}': key unknown in schema_dict"))
|
|
is_valid = False
|
|
|
|
elif type(schema_value) != type(data_value): # pylint: disable=unidiomatic-typecheck
|
|
issue_list.append(
|
|
SchemaIssue(
|
|
'invalid',
|
|
(f"data_dict: type mismatch '{name}':" f" expected {type(schema_value)}, is: {type(data_value)}"),
|
|
)
|
|
)
|
|
is_valid = False
|
|
|
|
elif isinstance(data_value, dict):
|
|
_valid, _ = _validate(names, issue_list, schema_dict, data_value, deprecated)
|
|
is_valid = is_valid and _valid
|
|
names.pop()
|
|
|
|
return is_valid, issue_list
|
|
|
|
|
|
def dict_deepupdate(base_dict: dict, upd_dict: dict, names=None):
|
|
"""Deep-update of dictionary in ``base_dict`` by dictionary in ``upd_dict``.
|
|
|
|
For each ``upd_key`` & ``upd_val`` pair in ``upd_dict``:
|
|
|
|
0. If types of ``base_dict[upd_key]`` and ``upd_val`` do not match raise a
|
|
:py:obj:`TypeError`.
|
|
|
|
1. If ``base_dict[upd_key]`` is a dict: recursively deep-update it by ``upd_val``.
|
|
|
|
2. If ``base_dict[upd_key]`` not exist: set ``base_dict[upd_key]`` from a
|
|
(deep-) copy of ``upd_val``.
|
|
|
|
3. If ``upd_val`` is a list, extend list in ``base_dict[upd_key]`` by the
|
|
list in ``upd_val``.
|
|
|
|
4. If ``upd_val`` is a set, update set in ``base_dict[upd_key]`` by set in
|
|
``upd_val``.
|
|
"""
|
|
# pylint: disable=too-many-branches
|
|
if not isinstance(base_dict, dict):
|
|
raise TypeError("argument 'base_dict' is not a ditionary type")
|
|
if not isinstance(upd_dict, dict):
|
|
raise TypeError("argument 'upd_dict' is not a ditionary type")
|
|
|
|
if names is None:
|
|
names = []
|
|
|
|
for upd_key, upd_val in upd_dict.items():
|
|
# For each upd_key & upd_val pair in upd_dict:
|
|
|
|
if isinstance(upd_val, dict):
|
|
|
|
if upd_key in base_dict:
|
|
# if base_dict[upd_key] exists, recursively deep-update it
|
|
if not isinstance(base_dict[upd_key], dict):
|
|
raise TypeError(f"type mismatch {'.'.join(names)}: is not a dict type in base_dict")
|
|
dict_deepupdate(
|
|
base_dict[upd_key],
|
|
upd_val,
|
|
names
|
|
+ [
|
|
upd_key,
|
|
],
|
|
)
|
|
|
|
else:
|
|
# if base_dict[upd_key] not exist, set base_dict[upd_key] from deepcopy of upd_val
|
|
base_dict[upd_key] = copy.deepcopy(upd_val)
|
|
|
|
elif isinstance(upd_val, list):
|
|
|
|
if upd_key in base_dict:
|
|
# if base_dict[upd_key] exists, base_dict[up_key] is extended by
|
|
# the list from upd_val
|
|
if not isinstance(base_dict[upd_key], list):
|
|
raise TypeError(f"type mismatch {'.'.join(names)}: is not a list type in base_dict")
|
|
base_dict[upd_key].extend(upd_val)
|
|
|
|
else:
|
|
# if base_dict[upd_key] doesn't exists, set base_dict[key] from a deepcopy of the
|
|
# list in upd_val.
|
|
base_dict[upd_key] = copy.deepcopy(upd_val)
|
|
|
|
elif isinstance(upd_val, set):
|
|
|
|
if upd_key in base_dict:
|
|
# if base_dict[upd_key] exists, base_dict[up_key] is updated by the set in upd_val
|
|
if not isinstance(base_dict[upd_key], set):
|
|
raise TypeError(f"type mismatch {'.'.join(names)}: is not a set type in base_dict")
|
|
base_dict[upd_key].update(upd_val.copy())
|
|
|
|
else:
|
|
# if base_dict[upd_key] doesn't exists, set base_dict[upd_key] from a copy of the
|
|
# set in upd_val
|
|
base_dict[upd_key] = upd_val.copy()
|
|
|
|
else:
|
|
# for any other type of upd_val replace or add base_dict[upd_key] by a copy
|
|
# of upd_val
|
|
base_dict[upd_key] = copy.copy(upd_val)
|