mirror of
https://github.com/searxng/searxng.git
synced 2024-12-17 23:06:27 +00:00
5731b6b700
Signed-off-by: Markus Heiser <markus.heiser@darmarit.de>
245 lines
7.7 KiB
Python
245 lines
7.7 KiB
Python
# SPDX-License-Identifier: AGPL-3.0-or-later
|
|
# lint: pylint
|
|
# pylint: disable=missing-module-docstring, missing-class-docstring
|
|
|
|
import sys
|
|
from hashlib import sha256
|
|
from importlib import import_module
|
|
from os import listdir, makedirs, remove, stat, utime
|
|
from os.path import abspath, basename, dirname, exists, join
|
|
from shutil import copyfile
|
|
from pkgutil import iter_modules
|
|
from logging import getLogger
|
|
|
|
from searx import logger, settings
|
|
|
|
|
|
logger = logger.getChild("plugins")
|
|
|
|
required_attrs = (
|
|
("name", str),
|
|
("description", str),
|
|
("default_on", bool)
|
|
)
|
|
|
|
optional_attrs = (
|
|
("js_dependencies", tuple),
|
|
("css_dependencies", tuple),
|
|
("preference_section", str),
|
|
)
|
|
|
|
|
|
def sha_sum(filename):
|
|
with open(filename, "rb") as f:
|
|
file_content_bytes = f.read()
|
|
return sha256(file_content_bytes).hexdigest()
|
|
|
|
|
|
def sync_resource(base_path, resource_path, name, target_dir, plugin_dir):
|
|
dep_path = join(base_path, resource_path)
|
|
file_name = basename(dep_path)
|
|
resource_path = join(target_dir, file_name)
|
|
if not exists(resource_path) or sha_sum(dep_path) != sha_sum(resource_path):
|
|
try:
|
|
copyfile(dep_path, resource_path)
|
|
# copy atime_ns and mtime_ns, so the weak ETags (generated by
|
|
# the HTTP server) do not change
|
|
dep_stat = stat(dep_path)
|
|
utime(resource_path, ns=(dep_stat.st_atime_ns, dep_stat.st_mtime_ns))
|
|
except IOError:
|
|
logger.critical(
|
|
"failed to copy plugin resource {0} for plugin {1}".format(
|
|
file_name, name
|
|
)
|
|
)
|
|
sys.exit(3)
|
|
|
|
# returning with the web path of the resource
|
|
return join("plugins/external_plugins", plugin_dir, file_name)
|
|
|
|
|
|
def prepare_package_resources(plugin, plugin_module_name):
|
|
plugin_base_path = dirname(abspath(plugin.__file__))
|
|
|
|
plugin_dir = plugin_module_name
|
|
target_dir = join(
|
|
settings["ui"]["static_path"], "plugins/external_plugins", plugin_dir
|
|
)
|
|
try:
|
|
makedirs(target_dir, exist_ok=True)
|
|
except IOError:
|
|
logger.critical(
|
|
"failed to create resource directory {0} for plugin {1}".format(
|
|
target_dir, plugin_module_name
|
|
)
|
|
)
|
|
sys.exit(3)
|
|
|
|
resources = []
|
|
|
|
if hasattr(plugin, "js_dependencies"):
|
|
resources.extend(map(basename, plugin.js_dependencies))
|
|
plugin.js_dependencies = ([
|
|
sync_resource(
|
|
plugin_base_path, x, plugin_module_name, target_dir, plugin_dir
|
|
) for x in plugin.js_dependencies
|
|
])
|
|
|
|
if hasattr(plugin, "css_dependencies"):
|
|
resources.extend(map(basename, plugin.css_dependencies))
|
|
plugin.css_dependencies = ([
|
|
sync_resource(
|
|
plugin_base_path, x, plugin_module_name, target_dir, plugin_dir
|
|
) for x in plugin.css_dependencies
|
|
])
|
|
|
|
for f in listdir(target_dir):
|
|
if basename(f) not in resources:
|
|
resource_path = join(target_dir, basename(f))
|
|
try:
|
|
remove(resource_path)
|
|
except IOError:
|
|
logger.critical(
|
|
"failed to remove unused resource file {0} for plugin {1}".format(
|
|
resource_path, plugin_module_name
|
|
)
|
|
)
|
|
sys.exit(3)
|
|
|
|
|
|
def load_plugin(plugin_module_name, external):
|
|
# pylint: disable=too-many-branches
|
|
try:
|
|
plugin = import_module(plugin_module_name)
|
|
except (
|
|
SyntaxError,
|
|
KeyboardInterrupt,
|
|
SystemExit,
|
|
SystemError,
|
|
ImportError,
|
|
RuntimeError,
|
|
) as e:
|
|
logger.critical("%s: fatal exception", plugin_module_name, exc_info=e)
|
|
sys.exit(3)
|
|
except BaseException:
|
|
logger.exception("%s: exception while loading, the plugin is disabled", plugin_module_name)
|
|
return None
|
|
|
|
# difference with searx: use module name instead of the user name
|
|
plugin.id = plugin_module_name
|
|
|
|
#
|
|
plugin.logger = getLogger(plugin_module_name)
|
|
|
|
for plugin_attr, plugin_attr_type in required_attrs:
|
|
if not hasattr(plugin, plugin_attr):
|
|
logger.critical(
|
|
'%s: missing attribute "%s", cannot load plugin', plugin, plugin_attr
|
|
)
|
|
sys.exit(3)
|
|
attr = getattr(plugin, plugin_attr)
|
|
if not isinstance(attr, plugin_attr_type):
|
|
type_attr = str(type(attr))
|
|
logger.critical(
|
|
'{1}: attribute "{0}" is of type {2}, must be of type {3}, cannot load plugin'.format(
|
|
plugin, plugin_attr, type_attr, plugin_attr_type
|
|
)
|
|
)
|
|
sys.exit(3)
|
|
|
|
for plugin_attr, plugin_attr_type in optional_attrs:
|
|
if not hasattr(plugin, plugin_attr) or not isinstance(
|
|
getattr(plugin, plugin_attr), plugin_attr_type
|
|
):
|
|
setattr(plugin, plugin_attr, plugin_attr_type())
|
|
|
|
if not hasattr(plugin, "preference_section"):
|
|
plugin.preference_section = "general"
|
|
|
|
# query plugin
|
|
if plugin.preference_section == "query":
|
|
for plugin_attr in ("query_keywords", "query_examples"):
|
|
if not hasattr(plugin, plugin_attr):
|
|
logger.critical(
|
|
'missing attribute "{0}", cannot load plugin: {1}'.format(
|
|
plugin_attr, plugin
|
|
)
|
|
)
|
|
sys.exit(3)
|
|
|
|
if settings.get("enabled_plugins"):
|
|
# searx compatibility: plugin.name in settings['enabled_plugins']
|
|
plugin.default_on = (
|
|
plugin.name in settings["enabled_plugins"]
|
|
or plugin.id in settings["enabled_plugins"]
|
|
)
|
|
|
|
# copy ressources if this is an external plugin
|
|
if external:
|
|
prepare_package_resources(plugin, plugin_module_name)
|
|
|
|
logger.debug("%s: loaded", plugin_module_name)
|
|
|
|
return plugin
|
|
|
|
|
|
def load_and_initialize_plugin(plugin_module_name, external, init_args):
|
|
plugin = load_plugin(plugin_module_name, external)
|
|
if plugin and hasattr(plugin, 'init'):
|
|
try:
|
|
return plugin if plugin.init(*init_args) else None
|
|
except Exception: # pylint: disable=broad-except
|
|
plugin.logger.exception(
|
|
"Exception while calling init, the plugin is disabled"
|
|
)
|
|
return None
|
|
return plugin
|
|
|
|
|
|
class PluginStore:
|
|
def __init__(self):
|
|
self.plugins = []
|
|
|
|
def __iter__(self):
|
|
for plugin in self.plugins:
|
|
yield plugin
|
|
|
|
def register(self, plugin):
|
|
self.plugins.append(plugin)
|
|
|
|
def call(self, ordered_plugin_list, plugin_type, *args, **kwargs):
|
|
# pylint: disable=no-self-use
|
|
ret = True
|
|
for plugin in ordered_plugin_list:
|
|
if hasattr(plugin, plugin_type):
|
|
try:
|
|
ret = getattr(plugin, plugin_type)(*args, **kwargs)
|
|
if not ret:
|
|
break
|
|
except Exception: # pylint: disable=broad-except
|
|
plugin.logger.exception("Exception while calling %s", plugin_type)
|
|
return ret
|
|
|
|
|
|
plugins = PluginStore()
|
|
|
|
|
|
def plugin_module_names():
|
|
yield_plugins = set()
|
|
|
|
# embedded plugins
|
|
for module in iter_modules(path=[dirname(__file__)]):
|
|
yield (__name__ + "." + module.name, False)
|
|
yield_plugins.add(module.name)
|
|
# external plugins
|
|
for module_name in settings['plugins']:
|
|
if module_name not in yield_plugins:
|
|
yield (module_name, True)
|
|
yield_plugins.add(module_name)
|
|
|
|
|
|
def initialize(app):
|
|
for module_name, external in plugin_module_names():
|
|
plugin = load_and_initialize_plugin(module_name, external, (app, settings))
|
|
if plugin:
|
|
plugins.register(plugin)
|