[refactor] typification of SearXNG (MainResult) / result items (part 2)

The class ReslutContainer has been revised, it can now handle the typed Result
items of classes:

- MainResult
- LegacyResult (a dict wrapper for backward compatibility)

Due to the now complete typing of theses three clases, instead of the *getitem*
accesses, the fields can now be accessed directly via attributes (which is also
supported by the IDE).

Signed-off-by: Markus Heiser <markus.heiser@darmarit.de>
This commit is contained in:
Markus Heiser 2025-03-05 17:29:20 +01:00 committed by Markus Heiser
parent d6ce29f7f0
commit 8769b7c6d6
15 changed files with 608 additions and 541 deletions

View file

@ -4,4 +4,7 @@
Answer Results
==============
The :ref:`area answer results` is an area in which additional information can
be displayed.
.. automodule:: searx.result_types.answer

View file

@ -9,7 +9,7 @@ Correction Results
There is still no typing for these result items. The templates can be used as
orientation until the final typing is complete.
The corrections area shows the user alternative search terms.
The :ref:`area corrections results` shows the user alternative search terms.
A result of this type is a very simple dictionary with only one key/value pair

View file

@ -18,22 +18,32 @@ The **sources** can be:
The sources provide the results, which are displayed in different **areas**
depending on the type of result. The areas are:
main results:
.. _area main results:
:ref:`area main results <main search results>`
It is the main area in which -- as is typical for search engines -- the
results that a search engine has found for the search term are displayed.
answers:
.. _area answer results:
:ref:`area answers <result_types.answer>`
This area displays short answers that could be found for the search term.
info box:
.. _area info box:
:ref:`area info box <result_types.infobox>`
An area in which additional information can be displayed, e.g. excerpts from
wikipedia or other sources such as maps.
suggestions:
.. _area suggestions results:
:ref:`area suggestions <result_types.suggestion>`
Suggestions for alternative search terms can be found in this area. These can
be clicked on and a search is carried out with these search terms.
corrections:
.. _area corrections results:
:ref:`area corrections <result_types.corrections>`
Results in this area are like the suggestion of alternative search terms,
which usually result from spelling corrections

View file

@ -9,7 +9,7 @@ Infobox Results
There is still no typing for these result items. The templates can be used as
orientation until the final typing is complete.
The infobox is an area where addtional infos shown to the user.
The :ref:`area info box` is an area where addtional infos shown to the user.
Fields used in the :origin:`infobox.html
<searx/templates/simple/elements/infobox.html>`:
@ -57,4 +57,3 @@ relatedTopics: :py:class:`List <list>`\ [\ :py:class:`dict`\ ]
key/value pair:
- suggestion: :py:class:`str`: suggested search term (mandatory)

View file

@ -0,0 +1,4 @@
.. _result_types.mainresult:
.. autoclass:: searx.result_types._base.MainResult
:members:

View file

@ -1,11 +1,25 @@
============
Main Results
============
.. _main search results:
There is still no typing for the items in the :ref:`main result list`. The
templates can be used as orientation until the final typing is complete.
===================
Main Search Results
===================
- :ref:`template default`
In the :ref:`area main results` the results that a search engine has found for
the search term are displayed.
There is still no typing for all items in the :ref:`main result list`. The
following types have been implemented so far ..
.. toctree::
:maxdepth: 2
main/mainresult
The :ref:`LegacyResult <LegacyResult>` is used internally for the results that
have not yet been typed. The templates can be used as orientation until the
final typing is complete.
- :ref:`template default` / :py:obj:`Result`
- :ref:`template images`
- :ref:`template videos`
- :ref:`template torrent`

View file

@ -9,7 +9,7 @@ Suggestion Results
There is still no typing for these result items. The templates can be used as
orientation until the final typing is complete.
The suggestions area shows the user alternative search terms.
The :ref:`area suggestions results` shows the user alternative search terms.
A result of this type is a very simple dictionary with only one key/value pair

View file

@ -142,3 +142,6 @@ class Engine: # pylint: disable=too-few-public-methods
tokens: List[str]
"""A list of secret tokens to make this engine *private*, more details see
:ref:`private engines`."""
weight: int
"""Weighting of the results of this engine (:ref:`weight <settings engines>`)."""

View file

@ -13,13 +13,13 @@
from __future__ import annotations
__all__ = ["Result", "EngineResults", "AnswerSet", "Answer", "Translations"]
__all__ = ["Result", "MainResult", "EngineResults", "AnswerSet", "Answer", "Translations"]
import abc
from searx import enginelib
from ._base import Result, LegacyResult
from ._base import Result, MainResult, LegacyResult
from .answer import AnswerSet, Answer, Translations
@ -30,13 +30,18 @@ class ResultList(list, abc.ABC):
"""The collection of result types (which have already been implemented)."""
Answer = Answer
MainResult = MainResult
Result = Result
Translations = Translations
# for backward compatibility
LegacyResult = LegacyResult
def __init__(self):
# pylint: disable=useless-parent-delegation
super().__init__()
def add(self, result: Result):
def add(self, result: Result | LegacyResult):
"""Add a :py:`Result` item to the result list."""
self.append(result)

View file

@ -10,6 +10,8 @@
.. autoclass:: Result
:members:
.. _LegacyResult:
.. autoclass:: LegacyResult
:members:
"""
@ -22,9 +24,88 @@ __all__ = ["Result"]
import re
import urllib.parse
import warnings
import typing
import msgspec
from searx import logger as log
WHITESPACE_REGEX = re.compile('( |\t|\n)+', re.M | re.U)
def _normalize_url_fields(result: Result | LegacyResult):
# As soon we need LegacyResult not any longer, we can move this function to
# method Result.normalize_result_fields
if result.url and not result.parsed_url:
if not isinstance(result.url, str):
log.debug('result: invalid URL: %s', str(result))
result.url = ""
result.parsed_url = None
else:
result.parsed_url = urllib.parse.urlparse(result.url)
if result.parsed_url:
result.parsed_url = result.parsed_url._replace(
# if the result has no scheme, use http as default
scheme=result.parsed_url.scheme or "http",
# normalize ``www.example.com`` to ``example.com``
netloc=result.parsed_url.netloc.replace("www.", ""),
# normalize ``example.com/path/`` to ``example.com/path``
path=result.parsed_url.path.rstrip("/"),
)
result.url = result.parsed_url.geturl()
if isinstance(result, LegacyResult) and getattr(result, "infobox", None):
# As soon we have InfoboxResult, we can move this function to method
# InfoboxResult.normalize_result_fields
infobox_urls: list[dict[str, str]] = getattr(result, "urls", [])
for item in infobox_urls:
_url = item.get("url")
if not _url:
continue
_url = urllib.parse.urlparse(_url)
item["url"] = _url._replace(
scheme=_url.scheme or "http",
netloc=_url.netloc.replace("www.", ""),
path=_url.path.rstrip("/"),
).geturl()
infobox_id = getattr(result, "id", None)
if infobox_id:
_url = urllib.parse.urlparse(infobox_id)
result.id = _url._replace(
scheme=_url.scheme or "http",
netloc=_url.netloc.replace("www.", ""),
path=_url.path.rstrip("/"),
).geturl()
def _normalize_text_fields(result: MainResult | LegacyResult):
# As soon we need LegacyResult not any longer, we can move this function to
# method MainResult.normalize_result_fields
# Actually, a type check should not be necessary if the engine is
# implemented correctly. Historically, however, we have always had a type
# check here.
if result.title and not isinstance(result.title, str):
log.debug("result: invalid type of field 'title': %s", str(result))
result.title = str(result)
if result.content and not isinstance(result.content, str):
log.debug("result: invalid type of field 'content': %s", str(result))
result.content = str(result)
# normalize title and content
result.title = WHITESPACE_REGEX.sub(" ", result.title).strip()
result.content = WHITESPACE_REGEX.sub(" ", result.content).strip()
if result.content == result.title:
# avoid duplicate content between the content and title fields
result.content = ""
class Result(msgspec.Struct, kw_only=True):
"""Base class of all result types :ref:`result types`."""
@ -54,21 +135,20 @@ class Result(msgspec.Struct, kw_only=True):
"""
def normalize_result_fields(self):
"""Normalize a result ..
"""Normalize fields ``url`` and ``parse_sql``.
- if field ``url`` is set and field ``parse_url`` is unset, init
``parse_url`` from field ``url``. This method can be extended in the
inheritance.
- If field ``url`` is set and field ``parse_url`` is unset, init
``parse_url`` from field ``url``. The ``url`` field is initialized
with the resulting value in ``parse_url``, if ``url`` and
``parse_url`` are not equal.
- ``www.example.com`` and ``example.com`` are equivalent and are normalized
to ``example.com``.
- ``example.com/path/`` and ``example.com/path`` are equivalent and are
normalized to ``example.com/path``.
"""
if not self.parsed_url and self.url:
self.parsed_url = urllib.parse.urlparse(self.url)
# if the result has no scheme, use http as default
if not self.parsed_url.scheme:
self.parsed_url = self.parsed_url._replace(scheme="http")
self.url = self.parsed_url.geturl()
_normalize_url_fields(self)
def __post_init__(self):
pass
@ -84,7 +164,6 @@ class Result(msgspec.Struct, kw_only=True):
The hash value is used in contexts, e.g. when checking for equality to
identify identical results from different sources (engines).
"""
return id(self)
def __eq__(self, other):
@ -113,12 +192,19 @@ class Result(msgspec.Struct, kw_only=True):
def as_dict(self):
return {f: getattr(self, f) for f in self.__struct_fields__}
def defaults_from(self, other: Result):
"""Fields not set in *self* will be updated from the field values of the
*other*.
"""
for field_name in self.__struct_fields__:
self_val = getattr(self, field_name, False)
other_val = getattr(other, field_name, False)
if self_val:
setattr(self, field_name, other_val)
class MainResult(Result): # pylint: disable=missing-class-docstring
# open_group and close_group should not manged in the Result class (we should rop it from here!)
open_group: bool = False
close_group: bool = False
"""Base class of all result types displayed in :ref:`area main results`."""
title: str = ""
"""Link title of the result item."""
@ -132,6 +218,43 @@ class MainResult(Result): # pylint: disable=missing-class-docstring
thumbnail: str = ""
"""URL of a thumbnail that is displayed in the result item."""
priority: typing.Literal["", "high", "low"] = ""
"""The priority can be set via :ref:`hostnames plugin`, for example."""
engines: set[str] = set()
"""In a merged results list, the names of the engines that found this result
are listed in this field."""
# open_group and close_group should not manged in the Result
# class (we should drop it from here!)
open_group: bool = False
close_group: bool = False
positions: list[int] = []
score: float = 0
category: str = ""
def __hash__(self) -> int:
"""Ordinary url-results are equal if their values for
:py:obj:`Result.template`, :py:obj:`Result.parsed_url` (without scheme)
and :py:obj:`MainResult.img_src` are equal.
"""
if not self.parsed_url:
raise ValueError(f"missing a value in field 'parsed_url': {self}")
url = self.parsed_url
return hash(
f"{self.template}"
+ f"|{url.netloc}|{url.path}|{url.params}|{url.query}|{url.fragment}"
+ f"|{self.img_src}"
)
def normalize_result_fields(self):
super().normalize_result_fields()
_normalize_text_fields(self)
if self.engine:
self.engines.add(self.engine)
class LegacyResult(dict):
"""A wrapper around a legacy result item. The SearXNG core uses this class
@ -150,7 +273,27 @@ class LegacyResult(dict):
"""
UNSET = object()
WHITESPACE_REGEX = re.compile('( |\t|\n)+', re.M | re.U)
# emulate field types from type class Result
url: str | None
template: str
engine: str
parsed_url: urllib.parse.ParseResult | None
# emulate field types from type class MainResult
title: str
content: str
img_src: str
thumbnail: str
priority: typing.Literal["", "high", "low"]
engines: set[str]
positions: list[int]
score: float
category: str
# infobox result
urls: list[dict[str, str]]
attributes: list[dict[str, str]]
def as_dict(self):
return self
@ -159,14 +302,26 @@ class LegacyResult(dict):
super().__init__(*args, **kwargs)
# Init fields with defaults / compare with defaults of the fields in class Result
self.engine = self.get("engine", "")
self.template = self.get("template", "default.html")
self.url = self.get("url", None)
self.parsed_url = self.get("parsed_url", None)
# emulate field types from type class Result
self["url"] = self.get("url")
self["template"] = self.get("template", "default.html")
self["engine"] = self.get("engine", "")
self["parsed_url"] = self.get("parsed_url")
self.content = self.get("content", "")
self.title = self.get("title", "")
# emulate field types from type class MainResult
self["title"] = self.get("title", "")
self["content"] = self.get("content", "")
self["img_src"] = self.get("img_src", "")
self["thumbnail"] = self.get("thumbnail", "")
self["priority"] = self.get("priority", "")
self["engines"] = self.get("engines", set())
self["positions"] = self.get("positions", "")
self["score"] = self.get("score", 0)
self["category"] = self.get("category", "")
if "infobox" in self:
self["urls"] = self.get("urls", [])
self["attributes"] = self.get("attributes", [])
# Legacy types that have already been ported to a type ..
@ -178,13 +333,47 @@ class LegacyResult(dict):
)
self.template = "answer/legacy.html"
if self.template == "keyvalue.html":
warnings.warn(
f"engine {self.engine} is using deprecated `dict` for key/value results"
f" / use a class from searx.result_types",
DeprecationWarning,
)
def __getattr__(self, name: str, default=UNSET) -> typing.Any:
if default == self.UNSET and name not in self:
raise AttributeError(f"LegacyResult object has no field named: {name}")
return self[name]
def __setattr__(self, name: str, val):
self[name] = val
def __hash__(self) -> int: # type: ignore
if "answer" in self:
# deprecated ..
return hash(self["answer"])
if self.template == "images.html":
# image results are equal if their values for template, the url and
# the img_src are equal.
return hash(f"{self.template}|{self.url}|{self.img_src}")
if not any(cls in self for cls in ["suggestion", "correction", "infobox", "number_of_results", "engine_data"]):
# it is a commun url-result ..
return hash(self.url)
# Ordinary url-results are equal if their values for template,
# parsed_url (without schema) and img_src` are equal.
# Code copied from with MainResult.__hash__:
if not self.parsed_url:
raise ValueError(f"missing a value in field 'parsed_url': {self}")
url = self.parsed_url
return hash(
f"{self.template}"
+ f"|{url.netloc}|{url.path}|{url.params}|{url.query}|{url.fragment}"
+ f"|{self.img_src}"
)
return id(self)
def __eq__(self, other):
@ -195,30 +384,13 @@ class LegacyResult(dict):
return f"LegacyResult: {super().__repr__()}"
def __getattr__(self, name: str, default=UNSET):
if default == self.UNSET and name not in self:
raise AttributeError(f"LegacyResult object has no field named: {name}")
return self[name]
def __setattr__(self, name: str, val):
self[name] = val
def normalize_result_fields(self):
_normalize_url_fields(self)
_normalize_text_fields(self)
if self.engine:
self.engines.add(self.engine)
self.title = self.WHITESPACE_REGEX.sub(" ", self.title)
if not self.parsed_url and self.url:
self.parsed_url = urllib.parse.urlparse(self.url)
# if the result has no scheme, use http as default
if not self.parsed_url.scheme:
self.parsed_url = self.parsed_url._replace(scheme="http")
self.url = self.parsed_url.geturl()
if self.content:
self.content = self.WHITESPACE_REGEX.sub(" ", self.content)
if self.content == self.title:
# avoid duplicate content between the content and title fields
self.content = ""
def defaults_from(self, other: LegacyResult):
for k, v in other.items():
if not self.get(k):
self[k] = v

View file

@ -1,143 +1,25 @@
# SPDX-License-Identifier: AGPL-3.0-or-later
# pylint: disable=missing-module-docstring
# pylint: disable=missing-module-docstring, missing-class-docstring
from __future__ import annotations
import warnings
import re
from collections import defaultdict
from operator import itemgetter
from threading import RLock
from typing import List, NamedTuple, Set
from urllib.parse import urlparse, unquote
from searx import logger
from searx.engines import engines
from searx.metrics import histogram_observe, counter_add, count_error
from searx.result_types import Result, LegacyResult
from searx import logger as log
import searx.engines
from searx.metrics import histogram_observe, counter_add
from searx.result_types import Result, LegacyResult, MainResult
from searx.result_types.answer import AnswerSet, BaseAnswer
CONTENT_LEN_IGNORED_CHARS_REGEX = re.compile(r'[,;:!?\./\\\\ ()-_]', re.M | re.U)
# return the meaningful length of the content for a result
def result_content_len(content):
if isinstance(content, str):
return len(CONTENT_LEN_IGNORED_CHARS_REGEX.sub('', content))
return 0
def compare_urls(url_a, url_b):
"""Lazy compare between two URL.
"www.example.com" and "example.com" are equals.
"www.example.com/path/" and "www.example.com/path" are equals.
"https://www.example.com/" and "http://www.example.com/" are equals.
Args:
url_a (ParseResult): first URL
url_b (ParseResult): second URL
Returns:
bool: True if url_a and url_b are equals
"""
# ignore www. in comparison
if url_a.netloc.startswith('www.'):
host_a = url_a.netloc.replace('www.', '', 1)
else:
host_a = url_a.netloc
if url_b.netloc.startswith('www.'):
host_b = url_b.netloc.replace('www.', '', 1)
else:
host_b = url_b.netloc
if host_a != host_b or url_a.query != url_b.query or url_a.fragment != url_b.fragment:
return False
# remove / from the end of the url if required
path_a = url_a.path[:-1] if url_a.path.endswith('/') else url_a.path
path_b = url_b.path[:-1] if url_b.path.endswith('/') else url_b.path
return unquote(path_a) == unquote(path_b)
def merge_two_infoboxes(infobox1, infobox2): # pylint: disable=too-many-branches, too-many-statements
# get engines weights
if hasattr(engines[infobox1['engine']], 'weight'):
weight1 = engines[infobox1['engine']].weight
else:
weight1 = 1
if hasattr(engines[infobox2['engine']], 'weight'):
weight2 = engines[infobox2['engine']].weight
else:
weight2 = 1
if weight2 > weight1:
infobox1['engine'] = infobox2['engine']
infobox1['engines'] |= infobox2['engines']
if 'urls' in infobox2:
urls1 = infobox1.get('urls', None)
if urls1 is None:
urls1 = []
for url2 in infobox2.get('urls', []):
unique_url = True
parsed_url2 = urlparse(url2.get('url', ''))
entity_url2 = url2.get('entity')
for url1 in urls1:
if (entity_url2 is not None and url1.get('entity') == entity_url2) or compare_urls(
urlparse(url1.get('url', '')), parsed_url2
):
unique_url = False
break
if unique_url:
urls1.append(url2)
infobox1['urls'] = urls1
if 'img_src' in infobox2:
img1 = infobox1.get('img_src', None)
img2 = infobox2.get('img_src')
if img1 is None:
infobox1['img_src'] = img2
elif weight2 > weight1:
infobox1['img_src'] = img2
if 'attributes' in infobox2:
attributes1 = infobox1.get('attributes')
if attributes1 is None:
infobox1['attributes'] = attributes1 = []
attributeSet = set()
for attribute in attributes1:
label = attribute.get('label')
if label not in attributeSet:
attributeSet.add(label)
entity = attribute.get('entity')
if entity not in attributeSet:
attributeSet.add(entity)
for attribute in infobox2.get('attributes', []):
if attribute.get('label') not in attributeSet and attribute.get('entity') not in attributeSet:
attributes1.append(attribute)
if 'content' in infobox2:
content1 = infobox1.get('content', None)
content2 = infobox2.get('content', '')
if content1 is not None:
if result_content_len(content2) > result_content_len(content1):
infobox1['content'] = content2
else:
infobox1['content'] = content2
def result_score(result, priority):
def calculate_score(result, priority) -> float:
weight = 1.0
for result_engine in result['engines']:
if hasattr(engines.get(result_engine), 'weight'):
weight *= float(engines[result_engine].weight)
if hasattr(searx.engines.engines.get(result_engine), 'weight'):
weight *= float(searx.engines.engines[result_engine].weight)
weight *= len(result['positions'])
score = 0
@ -153,61 +35,53 @@ def result_score(result, priority):
return score
class Timing(NamedTuple): # pylint: disable=missing-class-docstring
class Timing(NamedTuple):
engine: str
total: float
load: float
class UnresponsiveEngine(NamedTuple): # pylint: disable=missing-class-docstring
class UnresponsiveEngine(NamedTuple):
engine: str
error_type: str
suspended: bool
class ResultContainer:
"""docstring for ResultContainer"""
"""In the result container, the results are collected, sorted and duplicates
will be merged."""
__slots__ = (
'_merged_results',
'infoboxes',
'suggestions',
'answers',
'corrections',
'_number_of_results',
'_closed',
'paging',
'unresponsive_engines',
'timings',
'redirect_url',
'engine_data',
'on_result',
'_lock',
)
# pylint: disable=too-many-statements
main_results_map: dict[int, MainResult | LegacyResult]
infoboxes: list[LegacyResult]
suggestions: set[str]
answers: AnswerSet
corrections: set[str]
def __init__(self):
super().__init__()
self._merged_results: list[LegacyResult] = []
self.infoboxes: list[dict] = []
self.suggestions: set[str] = set()
self.main_results_map = {}
self.infoboxes = []
self.suggestions = set()
self.answers = AnswerSet()
self.corrections = set()
self._number_of_results: list[int] = []
self.engine_data: dict[str, str | dict] = defaultdict(dict)
self.engine_data: dict[str, dict[str, str]] = defaultdict(dict)
self._closed: bool = False
self.paging: bool = False
self.unresponsive_engines: Set[UnresponsiveEngine] = set()
self.timings: List[Timing] = []
self.redirect_url = None
self.redirect_url: str | None = None
self.on_result = lambda _: True
self._lock = RLock()
self._main_results_sorted: list[MainResult | LegacyResult] = None # type: ignore
def extend(self, engine_name: str | None, results): # pylint: disable=too-many-branches
if self._closed:
log.debug("container is closed, ignoring results: %s", results)
return
standard_result_count = 0
error_msgs = set()
main_count = 0
for result in list(results):
@ -217,267 +91,284 @@ class ResultContainer:
if isinstance(result, BaseAnswer) and self.on_result(result):
self.answers.add(result)
elif isinstance(result, MainResult) and self.on_result(result):
main_count += 1
self._merge_main_result(result, main_count)
else:
# more types need to be implemented in the future ..
raise NotImplementedError(f"no handler implemented to process the result of type {result}")
else:
result['engine'] = result.get('engine') or engine_name or ""
result["engine"] = result.get("engine") or engine_name or ""
result = LegacyResult(result) # for backward compatibility, will be romeved one day
result.normalize_result_fields()
if 'suggestion' in result and self.on_result(result):
self.suggestions.add(result['suggestion'])
elif 'answer' in result and self.on_result(result):
warnings.warn(
f"answer results from engine {result.engine}"
" are without typification / migrate to Answer class.",
DeprecationWarning,
)
self.answers.add(result)
elif 'correction' in result and self.on_result(result):
self.corrections.add(result['correction'])
elif 'infobox' in result and self.on_result(result):
self._merge_infobox(result)
elif 'number_of_results' in result and self.on_result(result):
self._number_of_results.append(result['number_of_results'])
elif 'engine_data' in result and self.on_result(result):
self.engine_data[result.engine][result['key']] = result['engine_data']
elif result.url:
# standard result (url, title, content)
if not self._is_valid_url_result(result, error_msgs):
continue
# normalize the result
result.normalize_result_fields()
# call on_result call searx.search.SearchWithPlugins._on_result
# which calls the plugins
if not self.on_result(result):
continue
self.__merge_url_result(result, standard_result_count + 1)
standard_result_count += 1
elif self.on_result(result):
self.__merge_result_no_url(result, standard_result_count + 1)
standard_result_count += 1
if "suggestion" in result:
if self.on_result(result):
self.suggestions.add(result["suggestion"])
continue
if len(error_msgs) > 0:
for msg in error_msgs:
count_error(engine_name, 'some results are invalids: ' + msg, secondary=True)
if "answer" in result:
if self.on_result(result):
warnings.warn(
f"answer results from engine {result.engine}"
" are without typification / migrate to Answer class.",
DeprecationWarning,
)
self.answers.add(result) # type: ignore
continue
if engine_name in engines:
histogram_observe(standard_result_count, 'engine', engine_name, 'result', 'count')
if "correction" in result:
if self.on_result(result):
self.corrections.add(result["correction"])
continue
if not self.paging and engine_name in engines and engines[engine_name].paging:
self.paging = True
if "infobox" in result:
if self.on_result(result):
self._merge_infobox(result)
continue
def _merge_infobox(self, infobox):
if "number_of_results" in result:
if self.on_result(result):
self._number_of_results.append(result["number_of_results"])
continue
if "engine_data" in result:
if self.on_result(result):
if result.engine:
self.engine_data[result.engine][result["key"]] = result["engine_data"]
continue
if self.on_result(result):
main_count += 1
self._merge_main_result(result, main_count)
continue
if engine_name in searx.engines.engines:
eng = searx.engines.engines[engine_name]
histogram_observe(main_count, "engine", eng.name, "result", "count")
if not self.paging and eng.paging:
self.paging = True
def _merge_infobox(self, new_infobox: LegacyResult):
add_infobox = True
infobox_id = infobox.get('id', None)
infobox['engines'] = set([infobox['engine']])
if infobox_id is not None:
parsed_url_infobox_id = urlparse(infobox_id)
new_id = getattr(new_infobox, "id", None)
if new_id is not None:
with self._lock:
for existingIndex in self.infoboxes:
if compare_urls(urlparse(existingIndex.get('id', '')), parsed_url_infobox_id):
merge_two_infoboxes(existingIndex, infobox)
for existing_infobox in self.infoboxes:
if new_id == getattr(existing_infobox, "id", None):
merge_two_infoboxes(existing_infobox, new_infobox)
add_infobox = False
if add_infobox:
self.infoboxes.append(infobox)
self.infoboxes.append(new_infobox)
def _is_valid_url_result(self, result, error_msgs):
if 'url' in result:
if not isinstance(result['url'], str):
logger.debug('result: invalid URL: %s', str(result))
error_msgs.add('invalid URL')
return False
def _merge_main_result(self, result: MainResult | LegacyResult, position):
result_hash = hash(result)
if 'title' in result and not isinstance(result['title'], str):
logger.debug('result: invalid title: %s', str(result))
error_msgs.add('invalid title')
return False
if 'content' in result:
if not isinstance(result['content'], str):
logger.debug('result: invalid content: %s', str(result))
error_msgs.add('invalid content')
return False
return True
def __merge_url_result(self, result, position):
result['engines'] = set([result['engine']])
with self._lock:
duplicated = self.__find_duplicated_http_result(result)
if duplicated:
self.__merge_duplicated_http_result(duplicated, result, position)
merged = self.main_results_map.get(result_hash)
if not merged:
# if there is no duplicate in the merged results, append result
result.positions = [position]
self.main_results_map[result_hash] = result
return
# if there is no duplicate found, append result
result['positions'] = [position]
self._merged_results.append(result)
def __find_duplicated_http_result(self, result):
result_template = result.get('template')
for merged_result in self._merged_results:
if not merged_result.get('parsed_url'):
continue
if compare_urls(result['parsed_url'], merged_result['parsed_url']) and result_template == merged_result.get(
'template'
):
if result_template != 'images.html':
# not an image, same template, same url : it's a duplicate
return merged_result
# it's an image
# it's a duplicate if the parsed_url, template and img_src are different
if result.get('img_src', '') == merged_result.get('img_src', ''):
return merged_result
return None
def __merge_duplicated_http_result(self, duplicated, result, position):
# use content with more text
if result_content_len(result.get('content', '')) > result_content_len(duplicated.get('content', '')):
duplicated['content'] = result['content']
# use title with more text
if result_content_len(result.get('title', '')) > len(duplicated.get('title', '')):
duplicated['title'] = result['title']
# merge all result's parameters not found in duplicate
for key in result.keys():
if not duplicated.get(key):
duplicated[key] = result.get(key)
# add the new position
duplicated['positions'].append(position)
# add engine to list of result-engines
duplicated['engines'].add(result['engine'])
# use https if possible
if duplicated['parsed_url'].scheme != 'https' and result['parsed_url'].scheme == 'https':
duplicated['url'] = result['parsed_url'].geturl()
duplicated['parsed_url'] = result['parsed_url']
def __merge_result_no_url(self, result, position):
result['engines'] = set([result['engine']])
result['positions'] = [position]
with self._lock:
self._merged_results.append(result)
merge_two_main_results(merged, result)
# add the new position
merged.positions.append(position)
def close(self):
self._closed = True
for result in self._merged_results:
result['score'] = result_score(result, result.get('priority'))
# removing html content and whitespace duplications
if result.get('content'):
result['content'] = result['content'].strip()
if result.get('title'):
result['title'] = ' '.join(result['title'].strip().split())
for result in self.main_results_map.values():
result.score = calculate_score(result, result.priority)
for eng_name in result.engines:
counter_add(result.score, 'engine', eng_name, 'score')
for result_engine in result['engines']:
counter_add(result['score'], 'engine', result_engine, 'score')
def get_ordered_results(self) -> list[MainResult | LegacyResult]:
"""Returns a sorted list of results to be displayed in the main result
area (:ref:`result types`)."""
results = sorted(self._merged_results, key=itemgetter('score'), reverse=True)
if not self._closed:
self.close()
if self._main_results_sorted:
return self._main_results_sorted
# first pass, sort results by "score" (descanding)
results = sorted(self.main_results_map.values(), key=lambda x: x.score, reverse=True)
# pass 2 : group results by category and template
gresults = []
categoryPositions = {}
max_count = 8
max_distance = 20
for res in results:
if not res.get('url'):
continue
# do we need to handle more than one category per engine?
engine = searx.engines.engines.get(res.engine or "")
if engine:
res.category = engine.categories[0] if len(engine.categories) > 0 else ""
# do we need to handle more than one category per engine?
engine = engines[res['engine']]
res['category'] = engine.categories[0] if len(engine.categories) > 0 else ''
category = f"{res.category}:{res.template}:{'img_src' if (res.thumbnail or res.img_src) else ''}"
grp = categoryPositions.get(category)
# do we need to handle more than one category per engine?
category = (
res['category']
+ ':'
+ res.get('template', '')
+ ':'
+ ('img_src' if 'img_src' in res or 'thumbnail' in res else '')
)
# group with previous results using the same category, if the group
# can accept more result and is not too far from the current
# position
current = None if category not in categoryPositions else categoryPositions[category]
# group with previous results using the same category
# if the group can accept more result and is not too far
# from the current position
if current is not None and (current['count'] > 0) and (len(gresults) - current['index'] < 20):
# group with the previous results using
# the same category with this one
index = current['index']
if (grp is not None) and (grp["count"] > 0) and (len(gresults) - grp["index"] < max_distance):
# group with the previous results using the same category with
# this one
index = grp["index"]
gresults.insert(index, res)
# update every index after the current one
# (including the current one)
for k in categoryPositions: # pylint: disable=consider-using-dict-items
v = categoryPositions[k]['index']
# update every index after the current one (including the
# current one)
for item in categoryPositions.values():
v = item["index"]
if v >= index:
categoryPositions[k]['index'] = v + 1
item["index"] = v + 1
# update this category
current['count'] -= 1
grp["count"] -= 1
else:
# same category
gresults.append(res)
# update categoryIndex
categoryPositions[category] = {'index': len(gresults), 'count': 8}
categoryPositions[category] = {"index": len(gresults), "count": max_count}
continue
# update _merged_results
self._merged_results = gresults
def get_ordered_results(self):
if not self._closed:
self.close()
return self._merged_results
def results_length(self):
return len(self._merged_results)
self._main_results_sorted = gresults
return self._main_results_sorted
@property
def number_of_results(self) -> int:
"""Returns the average of results number, returns zero if the average
result number is smaller than the actual result count."""
with self._lock:
if not self._closed:
logger.error("call to ResultContainer.number_of_results before ResultContainer.close")
return 0
if not self._closed:
log.error("call to ResultContainer.number_of_results before ResultContainer.close")
return 0
with self._lock:
resultnum_sum = sum(self._number_of_results)
if not resultnum_sum or not self._number_of_results:
return 0
average = int(resultnum_sum / len(self._number_of_results))
if average < self.results_length():
if average < len(self.get_ordered_results()):
average = 0
return average
def add_unresponsive_engine(self, engine_name: str, error_type: str, suspended: bool = False):
with self._lock:
if self._closed:
logger.error("call to ResultContainer.add_unresponsive_engine after ResultContainer.close")
log.error("call to ResultContainer.add_unresponsive_engine after ResultContainer.close")
return
if engines[engine_name].display_error_messages:
if searx.engines.engines[engine_name].display_error_messages:
self.unresponsive_engines.add(UnresponsiveEngine(engine_name, error_type, suspended))
def add_timing(self, engine_name: str, engine_time: float, page_load_time: float):
with self._lock:
if self._closed:
logger.error("call to ResultContainer.add_timing after ResultContainer.close")
log.error("call to ResultContainer.add_timing after ResultContainer.close")
return
self.timings.append(Timing(engine_name, total=engine_time, load=page_load_time))
def get_timings(self):
with self._lock:
if not self._closed:
logger.error("call to ResultContainer.get_timings before ResultContainer.close")
log.error("call to ResultContainer.get_timings before ResultContainer.close")
return []
return self.timings
def merge_two_infoboxes(origin: LegacyResult, other: LegacyResult):
"""Merges the values from ``other`` into ``origin``."""
# pylint: disable=too-many-branches
weight1 = getattr(searx.engines.engines[origin.engine], "weight", 1)
weight2 = getattr(searx.engines.engines[other.engine], "weight", 1)
if weight2 > weight1:
origin.engine = other.engine
origin.engines |= other.engines
if other.urls:
url_items = origin.get("urls", [])
for url2 in other.urls:
unique_url = True
entity_url2 = url2.get("entity")
for url1 in origin.get("urls", []):
if (entity_url2 is not None and entity_url2 == url1.get("entity")) or (
url1.get("url") == url2.get("url")
):
unique_url = False
break
if unique_url:
url_items.append(url2)
origin.urls = url_items
if other.img_src:
if not origin.img_src:
origin.img_src = other.img_src
elif weight2 > weight1:
origin.img_src = other.img_src
if other.attributes:
if not origin.attributes:
origin.attributes = other.attributes
else:
attr_names_1 = set()
for attr in origin.attributes:
label = attr.get("label")
if label:
attr_names_1.add(label)
entity = attr.get("entity")
if entity:
attr_names_1.add(entity)
for attr in other.attributes:
if attr.get("label") not in attr_names_1 and attr.get('entity') not in attr_names_1:
origin.attributes.append(attr)
if other.content:
if not origin.content:
origin.content = other.content
elif len(other.content) > len(origin.content):
origin.content = other.content
def merge_two_main_results(origin: MainResult | LegacyResult, other: MainResult | LegacyResult):
"""Merges the values from ``other`` into ``origin``."""
if len(other.content) > len(origin.content):
# use content with more text
origin.content = other.content
# use title with more text
if len(other.title) > len(origin.title):
origin.title = other.title
# merge all result's parameters not found in origin
if isinstance(other, MainResult) and isinstance(origin, MainResult):
origin.defaults_from(other)
elif isinstance(other, LegacyResult) and isinstance(origin, LegacyResult):
origin.defaults_from(other)
# add engine to list of result-engines
origin.engines.add(other.engine or "")
# use https, ftps, .. if possible
if origin.parsed_url and not origin.parsed_url.scheme.endswith("s"):
if other.parsed_url and other.parsed_url.scheme.endswith("s"):
origin.parsed_url = origin.parsed_url._replace(scheme=other.parsed_url.scheme)
origin.url = origin.parsed_url.geturl()

View file

@ -2,6 +2,8 @@
# pylint: disable=missing-module-docstring,disable=missing-class-docstring,invalid-name
from searx.engines import command as command_engine
from searx.result_types import KeyValue
from tests import SearxTestCase
@ -12,14 +14,15 @@ class TestCommandEngine(SearxTestCase):
ls_engine.command = ['seq', '{{QUERY}}']
ls_engine.delimiter = {'chars': ' ', 'keys': ['number']}
expected_results = [
{'number': '1', 'template': 'key-value.html'},
{'number': '2', 'template': 'key-value.html'},
{'number': '3', 'template': 'key-value.html'},
{'number': '4', 'template': 'key-value.html'},
{'number': '5', 'template': 'key-value.html'},
KeyValue(kvmap={'number': 1}),
KeyValue(kvmap={'number': 2}),
KeyValue(kvmap={'number': 3}),
KeyValue(kvmap={'number': 4}),
KeyValue(kvmap={'number': 5}),
]
results = ls_engine.search('5', {'pageno': 1})
self.assertEqual(results, expected_results)
for i, expected in enumerate(expected_results):
self.assertEqual(results[i].kvmap["number"], str(expected.kvmap["number"]))
def test_delimiter_parsing(self):
searx_logs = '''DEBUG:searx.webapp:static directory is /home/n/p/searx/searx/static
@ -39,94 +42,85 @@ INFO:werkzeug: * Debugger PIN: 299-578-362'''
echo_engine.command = ['echo', searx_logs]
echo_engine.delimiter = {'chars': ':', 'keys': ['level', 'component', 'message']}
expected_results_by_page = [
[
{
'component': 'searx.webapp',
'message': 'static directory is /home/n/p/searx/searx/static',
'template': 'key-value.html',
'level': 'DEBUG',
},
{
'component': 'searx.webapp',
'message': 'templates directory is /home/n/p/searx/searx/templates',
'template': 'key-value.html',
'level': 'DEBUG',
},
{
'component': 'searx.engines',
'message': 'soundcloud engine: Starting background initialization',
'template': 'key-value.html',
'level': 'DEBUG',
},
{
'component': 'searx.engines',
'message': 'wolframalpha engine: Starting background initialization',
'template': 'key-value.html',
'level': 'DEBUG',
},
{
'component': 'searx.engines',
'message': 'locate engine: Starting background initialization',
'template': 'key-value.html',
'level': 'DEBUG',
},
{
'component': 'searx.engines',
'message': 'regex search in files engine: Starting background initialization',
'template': 'key-value.html',
'level': 'DEBUG',
},
{
'component': 'urllib3.connectionpool',
'message': 'Starting new HTTPS connection (1): www.wolframalpha.com',
'template': 'key-value.html',
'level': 'DEBUG',
},
{
'component': 'urllib3.connectionpool',
'message': 'Starting new HTTPS connection (1): soundcloud.com',
'template': 'key-value.html',
'level': 'DEBUG',
},
{
'component': 'searx.engines',
'message': 'find engine: Starting background initialization',
'template': 'key-value.html',
'level': 'DEBUG',
},
{
'component': 'searx.engines',
'message': 'pattern search in files engine: Starting background initialization',
'template': 'key-value.html',
'level': 'DEBUG',
},
],
[
{
'component': 'searx.webapp',
'message': 'starting webserver on 127.0.0.1:8888',
'template': 'key-value.html',
'level': 'DEBUG',
},
{
'component': 'werkzeug',
'message': ' * Debugger is active!',
'template': 'key-value.html',
'level': 'WARNING',
},
{
'component': 'werkzeug',
'message': ' * Debugger PIN: 299-578-362',
'template': 'key-value.html',
'level': 'INFO',
},
],
page1 = [
{
'component': 'searx.webapp',
'message': 'static directory is /home/n/p/searx/searx/static',
'level': 'DEBUG',
},
{
'component': 'searx.webapp',
'message': 'templates directory is /home/n/p/searx/searx/templates',
'level': 'DEBUG',
},
{
'component': 'searx.engines',
'message': 'soundcloud engine: Starting background initialization',
'level': 'DEBUG',
},
{
'component': 'searx.engines',
'message': 'wolframalpha engine: Starting background initialization',
'level': 'DEBUG',
},
{
'component': 'searx.engines',
'message': 'locate engine: Starting background initialization',
'level': 'DEBUG',
},
{
'component': 'searx.engines',
'message': 'regex search in files engine: Starting background initialization',
'level': 'DEBUG',
},
{
'component': 'urllib3.connectionpool',
'message': 'Starting new HTTPS connection (1): www.wolframalpha.com',
'level': 'DEBUG',
},
{
'component': 'urllib3.connectionpool',
'message': 'Starting new HTTPS connection (1): soundcloud.com',
'level': 'DEBUG',
},
{
'component': 'searx.engines',
'message': 'find engine: Starting background initialization',
'level': 'DEBUG',
},
{
'component': 'searx.engines',
'message': 'pattern search in files engine: Starting background initialization',
'level': 'DEBUG',
},
]
page2 = [
{
'component': 'searx.webapp',
'message': 'starting webserver on 127.0.0.1:8888',
'level': 'DEBUG',
},
{
'component': 'werkzeug',
'message': ' * Debugger is active!',
'level': 'WARNING',
},
{
'component': 'werkzeug',
'message': ' * Debugger PIN: 299-578-362',
'level': 'INFO',
},
]
page1 = [KeyValue(kvmap=row) for row in page1]
page2 = [KeyValue(kvmap=row) for row in page2]
expected_results_by_page = [page1, page2]
for i in [0, 1]:
results = echo_engine.search('', {'pageno': i + 1})
self.assertEqual(results, expected_results_by_page[i])
page = expected_results_by_page[i]
for i, expected in enumerate(page):
self.assertEqual(expected.kvmap["message"], str(results[i].kvmap["message"]))
def test_regex_parsing(self):
txt = '''commit 35f9a8c81d162a361b826bbcd4a1081a4fbe76a7
@ -165,26 +159,25 @@ commit '''
'author': ' Noémi Ványi <sitbackandwait@gmail.com>',
'date': 'Date: Tue Oct 15 11:31:33 2019 +0200',
'message': '\n\nfirst interesting message',
'template': 'key-value.html',
},
{
'commit': '6c3c206316153ccc422755512bceaa9ab0b14faa',
'author': ' Noémi Ványi <sitbackandwait@gmail.com>',
'date': 'Date: Mon Oct 14 17:10:08 2019 +0200',
'message': '\n\nsecond interesting message',
'template': 'key-value.html',
},
{
'commit': 'd8594d2689b4d5e0d2f80250223886c3a1805ef5',
'author': ' Noémi Ványi <sitbackandwait@gmail.com>',
'date': 'Date: Mon Oct 14 14:45:05 2019 +0200',
'message': '\n\nthird interesting message',
'template': 'key-value.html',
},
]
expected_results = [KeyValue(kvmap=kvmap) for kvmap in expected_results]
results = git_log_engine.search('', {'pageno': 1})
self.assertEqual(results, expected_results)
for i, expected in enumerate(expected_results):
self.assertEqual(expected.kvmap["message"], str(results[i].kvmap["message"]))
def test_working_dir_path_query(self):
ls_engine = command_engine

View file

@ -1,30 +0,0 @@
# SPDX-License-Identifier: AGPL-3.0-or-later
# pylint: disable=missing-module-docstring,disable=missing-class-docstring,invalid-name
from unittest.mock import MagicMock, Mock
from searx.engines import mariadb_server
from tests import SearxTestCase
class MariadbServerTests(SearxTestCase):
def test_init_no_query_str_raises(self):
self.assertRaises(ValueError, lambda: mariadb_server.init({}))
def test_init_non_select_raises(self):
self.assertRaises(ValueError, lambda: mariadb_server.init({'query_str': 'foobar'}))
def test_search_returns_results(self):
test_string = 'FOOBAR'
cursor_mock = MagicMock()
with cursor_mock as setup: # pylint: disable=not-context-manager
setup.__iter__ = Mock(return_value=iter([{test_string, 1}]))
setup.description = [[test_string]]
conn_mock = Mock()
conn_mock.cursor.return_value = cursor_mock
mariadb_server._connection = conn_mock # pylint: disable=protected-access
results = mariadb_server.search(test_string, {'pageno': 1})
self.assertEqual(1, len(results))
self.assertIn(test_string, results[0])
self.assertEqual(mariadb_server.result_template, results[0]['template'])

View file

@ -23,8 +23,11 @@ class ResultContainerTestCase(SearxTestCase):
container.extend("google", [result])
container.close()
self.assertEqual(container.results_length(), 1)
self.assertIn(LegacyResult(result), container.get_ordered_results())
self.assertEqual(len(container.get_ordered_results()), 1)
res = LegacyResult(result)
res.normalize_result_fields()
self.assertIn(res, container.get_ordered_results())
def test_one_suggestion(self):
result = dict(suggestion="lorem ipsum ..")
@ -33,7 +36,7 @@ class ResultContainerTestCase(SearxTestCase):
container.extend("duckduckgo", [result])
container.close()
self.assertEqual(container.results_length(), 0)
self.assertEqual(len(container.get_ordered_results()), 0)
self.assertEqual(len(container.suggestions), 1)
self.assertIn(result["suggestion"], container.suggestions)
@ -42,6 +45,7 @@ class ResultContainerTestCase(SearxTestCase):
result = LegacyResult(
url="https://example.org", title="very long title, lorem ipsum", content="Lorem ipsum dolor sit amet .."
)
result.normalize_result_fields()
eng1 = dict(url=result.url, title="short title", content=result.content, engine="google")
eng2 = dict(url="http://example.org", title=result.title, content="lorem ipsum", engine="duckduckgo")
@ -50,7 +54,7 @@ class ResultContainerTestCase(SearxTestCase):
container.close()
result_list = container.get_ordered_results()
self.assertEqual(container.results_length(), 1)
self.assertEqual(len(container.get_ordered_results()), 1)
self.assertIn(result, result_list)
self.assertEqual(result_list[0].title, result.title)
self.assertEqual(result_list[0].content, result.content)

View file

@ -148,11 +148,10 @@ class ViewsTestCase(SearxTestCase): # pylint: disable=too-many-public-methods
def test_search_csv(self):
result = self.client.post('/search', data={'q': 'test', 'format': 'csv'})
self.assertEqual(
b'title,url,content,host,engine,score,type\r\n'
b'First Test,http://first.test.xyz,first test content,first.test.xyz,startpage,,result\r\n' # noqa
b'Second Test,http://second.test.xyz,second test content,second.test.xyz,youtube,,result\r\n', # noqa
+ b'First Test,http://first.test.xyz,first test content,first.test.xyz,startpage,0,result\r\n'
+ b'Second Test,http://second.test.xyz,second test content,second.test.xyz,youtube,0,result\r\n',
result.data,
)