[mod] check the engine tokens in searx/webadapter.py instead of searx/search.py

This commit is contained in:
Alexandre Flament 2020-09-22 16:55:59 +02:00
parent eecfff2689
commit 691d12726b
6 changed files with 95 additions and 64 deletions

View file

@ -62,7 +62,7 @@ class SearchQuery:
"""container for all the search parameters (query, language, etc...)""" """container for all the search parameters (query, language, etc...)"""
def __init__(self, query, engineref_list, categories, lang, safesearch, pageno, time_range, def __init__(self, query, engineref_list, categories, lang, safesearch, pageno, time_range,
timeout_limit=None, preferences=None, external_bang=None): timeout_limit=None, external_bang=None):
self.query = query self.query = query
self.engineref_list = engineref_list self.engineref_list = engineref_list
self.categories = categories self.categories = categories
@ -71,7 +71,6 @@ class SearchQuery:
self.pageno = pageno self.pageno = pageno
self.time_range = time_range self.time_range = time_range
self.timeout_limit = timeout_limit self.timeout_limit = timeout_limit
self.preferences = preferences
self.external_bang = external_bang self.external_bang = external_bang
def __str__(self): def __str__(self):
@ -311,9 +310,6 @@ class Search:
return False return False
def _is_accepted(self, engine_name, engine): def _is_accepted(self, engine_name, engine):
if not self.search_query.preferences.validate_token(engine):
return False
# skip suspended engines # skip suspended engines
if engine.suspend_end_time >= time(): if engine.suspend_end_time >= time():
logger.debug('Engine currently suspended: %s', engine_name) logger.debug('Engine currently suspended: %s', engine_name)

View file

@ -11,6 +11,31 @@ def deduplicate_engineref_list(engineref_list):
return engineref_dict.values() return engineref_dict.values()
def validate_engineref_list(engineref_list, preferences):
"""
Validate query_engines according to the preferences
Returns:
list of existing engines with a validated token
list of unknown engine
list of engine with invalid token according to the preferences
"""
valid = []
unknown = []
no_token = []
for engineref in engineref_list:
if engineref.name not in engines:
unknown.append(engineref)
continue
engine = engines[engineref.name]
if not preferences.validate_token(engine):
no_token.append(engineref)
continue
valid.append(engineref)
return valid, unknown, no_token
def get_search_query_from_webapp(preferences, form): def get_search_query_from_webapp(preferences, form):
# no text for the query ? # no text for the query ?
if not form.get('q'): if not form.get('q'):
@ -152,10 +177,14 @@ def get_search_query_from_webapp(preferences, form):
if (engine.name, categ) not in disabled_engines) if (engine.name, categ) not in disabled_engines)
query_engineref_list = deduplicate_engineref_list(query_engineref_list) query_engineref_list = deduplicate_engineref_list(query_engineref_list)
query_engineref_list, query_engineref_list_unknown, query_engineref_list_notoken =\
validate_engineref_list(query_engineref_list, preferences)
external_bang = raw_text_query.external_bang external_bang = raw_text_query.external_bang
return (SearchQuery(query, query_engineref_list, query_categories, return (SearchQuery(query, query_engineref_list, query_categories,
query_lang, query_safesearch, query_pageno, query_lang, query_safesearch, query_pageno,
query_time_range, query_timeout, preferences, query_time_range, query_timeout,
external_bang=external_bang), external_bang=external_bang),
raw_text_query) raw_text_query,
query_engineref_list_unknown,
query_engineref_list_notoken)

View file

@ -562,7 +562,7 @@ def index():
raw_text_query = None raw_text_query = None
result_container = None result_container = None
try: try:
search_query, raw_text_query = get_search_query_from_webapp(request.preferences, request.form) search_query, raw_text_query, _, _ = get_search_query_from_webapp(request.preferences, request.form)
# search = Search(search_query) # without plugins # search = Search(search_query) # without plugins
search = SearchWithPlugins(search_query, request.user_plugins, request) search = SearchWithPlugins(search_query, request.user_plugins, request)

View file

@ -1,17 +1,13 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
from searx.testing import SearxTestCase from searx.testing import SearxTestCase
from searx.preferences import Preferences
from searx.engines import engines
import searx.search
from searx.search import SearchQuery, EngineRef from searx.search import SearchQuery, EngineRef
import searx.search
SAFESEARCH = 0 SAFESEARCH = 0
PAGENO = 1 PAGENO = 1
PUBLIC_ENGINE_NAME = 'general dummy' PUBLIC_ENGINE_NAME = 'general dummy'
PRIVATE_ENGINE_NAME = 'general private offline'
TEST_ENGINES = [ TEST_ENGINES = [
{ {
'name': PUBLIC_ENGINE_NAME, 'name': PUBLIC_ENGINE_NAME,
@ -21,15 +17,6 @@ TEST_ENGINES = [
'timeout': 3.0, 'timeout': 3.0,
'tokens': [], 'tokens': [],
}, },
{
'name': PRIVATE_ENGINE_NAME,
'engine': 'dummy-offline',
'categories': 'general',
'shortcut': 'do',
'timeout': 3.0,
'offline': True,
'tokens': ['my-token'],
},
] ]
@ -42,8 +29,7 @@ class SearchTestCase(SearxTestCase):
def test_timeout_simple(self): def test_timeout_simple(self):
searx.search.max_request_timeout = None searx.search.max_request_timeout = None
search_query = SearchQuery('test', [EngineRef(PUBLIC_ENGINE_NAME, 'general')], search_query = SearchQuery('test', [EngineRef(PUBLIC_ENGINE_NAME, 'general')],
['general'], 'en-US', SAFESEARCH, PAGENO, None, None, ['general'], 'en-US', SAFESEARCH, PAGENO, None, None)
preferences=Preferences(['oscar'], ['general'], engines, []))
search = searx.search.Search(search_query) search = searx.search.Search(search_query)
search.search() search.search()
self.assertEqual(search.actual_timeout, 3.0) self.assertEqual(search.actual_timeout, 3.0)
@ -51,8 +37,7 @@ class SearchTestCase(SearxTestCase):
def test_timeout_query_above_default_nomax(self): def test_timeout_query_above_default_nomax(self):
searx.search.max_request_timeout = None searx.search.max_request_timeout = None
search_query = SearchQuery('test', [EngineRef(PUBLIC_ENGINE_NAME, 'general')], search_query = SearchQuery('test', [EngineRef(PUBLIC_ENGINE_NAME, 'general')],
['general'], 'en-US', SAFESEARCH, PAGENO, None, 5.0, ['general'], 'en-US', SAFESEARCH, PAGENO, None, 5.0)
preferences=Preferences(['oscar'], ['general'], engines, []))
search = searx.search.Search(search_query) search = searx.search.Search(search_query)
search.search() search.search()
self.assertEqual(search.actual_timeout, 3.0) self.assertEqual(search.actual_timeout, 3.0)
@ -60,8 +45,7 @@ class SearchTestCase(SearxTestCase):
def test_timeout_query_below_default_nomax(self): def test_timeout_query_below_default_nomax(self):
searx.search.max_request_timeout = None searx.search.max_request_timeout = None
search_query = SearchQuery('test', [EngineRef(PUBLIC_ENGINE_NAME, 'general')], search_query = SearchQuery('test', [EngineRef(PUBLIC_ENGINE_NAME, 'general')],
['general'], 'en-US', SAFESEARCH, PAGENO, None, 1.0, ['general'], 'en-US', SAFESEARCH, PAGENO, None, 1.0)
preferences=Preferences(['oscar'], ['general'], engines, []))
search = searx.search.Search(search_query) search = searx.search.Search(search_query)
search.search() search.search()
self.assertEqual(search.actual_timeout, 1.0) self.assertEqual(search.actual_timeout, 1.0)
@ -69,8 +53,7 @@ class SearchTestCase(SearxTestCase):
def test_timeout_query_below_max(self): def test_timeout_query_below_max(self):
searx.search.max_request_timeout = 10.0 searx.search.max_request_timeout = 10.0
search_query = SearchQuery('test', [EngineRef(PUBLIC_ENGINE_NAME, 'general')], search_query = SearchQuery('test', [EngineRef(PUBLIC_ENGINE_NAME, 'general')],
['general'], 'en-US', SAFESEARCH, PAGENO, None, 5.0, ['general'], 'en-US', SAFESEARCH, PAGENO, None, 5.0)
preferences=Preferences(['oscar'], ['general'], engines, []))
search = searx.search.Search(search_query) search = searx.search.Search(search_query)
search.search() search.search()
self.assertEqual(search.actual_timeout, 5.0) self.assertEqual(search.actual_timeout, 5.0)
@ -78,45 +61,15 @@ class SearchTestCase(SearxTestCase):
def test_timeout_query_above_max(self): def test_timeout_query_above_max(self):
searx.search.max_request_timeout = 10.0 searx.search.max_request_timeout = 10.0
search_query = SearchQuery('test', [EngineRef(PUBLIC_ENGINE_NAME, 'general')], search_query = SearchQuery('test', [EngineRef(PUBLIC_ENGINE_NAME, 'general')],
['general'], 'en-US', SAFESEARCH, PAGENO, None, 15.0, ['general'], 'en-US', SAFESEARCH, PAGENO, None, 15.0)
preferences=Preferences(['oscar'], ['general'], engines, []))
search = searx.search.Search(search_query) search = searx.search.Search(search_query)
search.search() search.search()
self.assertEqual(search.actual_timeout, 10.0) self.assertEqual(search.actual_timeout, 10.0)
def test_query_private_engine_without_token(self):
search_query = SearchQuery('test', [EngineRef(PRIVATE_ENGINE_NAME, 'general')],
['general'], 'en-US', SAFESEARCH, PAGENO, None, 2.0,
preferences=Preferences(['oscar'], ['general'], engines, []))
search = searx.search.Search(search_query)
results = search.search()
self.assertEqual(results.results_length(), 0)
def test_query_private_engine_with_incorrect_token(self):
preferences_with_tokens = Preferences(['oscar'], ['general'], engines, [])
preferences_with_tokens.parse_dict({'tokens': 'bad-token'})
search_query = SearchQuery('test', [EngineRef(PRIVATE_ENGINE_NAME, 'general')],
['general'], 'en-US', SAFESEARCH, PAGENO, None, 2.0,
preferences=preferences_with_tokens)
search = searx.search.Search(search_query)
results = search.search()
self.assertEqual(results.results_length(), 0)
def test_query_private_engine_with_correct_token(self):
preferences_with_tokens = Preferences(['oscar'], ['general'], engines, [])
preferences_with_tokens.parse_dict({'tokens': 'my-token'})
search_query = SearchQuery('test', [EngineRef(PRIVATE_ENGINE_NAME, 'general')],
['general'], 'en-US', SAFESEARCH, PAGENO, None, 2.0,
preferences=preferences_with_tokens)
search = searx.search.Search(search_query)
results = search.search()
self.assertEqual(results.results_length(), 1)
def test_external_bang(self): def test_external_bang(self):
search_query = SearchQuery('yes yes', search_query = SearchQuery('yes yes',
[EngineRef(PUBLIC_ENGINE_NAME, 'general')], [EngineRef(PUBLIC_ENGINE_NAME, 'general')],
['general'], 'en-US', SAFESEARCH, PAGENO, None, None, ['general'], 'en-US', SAFESEARCH, PAGENO, None, None,
preferences=Preferences(['oscar'], ['general'], engines, [],),
external_bang="yt") external_bang="yt")
search = searx.search.Search(search_query) search = searx.search.Search(search_query)
results = search.search() results = search.search()
@ -125,8 +78,7 @@ class SearchTestCase(SearxTestCase):
search_query = SearchQuery('youtube never gonna give you up', search_query = SearchQuery('youtube never gonna give you up',
[EngineRef(PUBLIC_ENGINE_NAME, 'general')], [EngineRef(PUBLIC_ENGINE_NAME, 'general')],
['general'], 'en-US', SAFESEARCH, PAGENO, None, None, ['general'], 'en-US', SAFESEARCH, PAGENO, None, None)
preferences=Preferences(['oscar'], ['general'], engines, []),)
search = searx.search.Search(search_query) search = searx.search.Search(search_query)
results = search.search() results = search.search()

View file

@ -0,0 +1,54 @@
# -*- coding: utf-8 -*-
from searx.testing import SearxTestCase
from searx.preferences import Preferences
from searx.engines import engines
import searx.search
from searx.search import EngineRef, SearchQuery
from searx.webadapter import validate_engineref_list
PRIVATE_ENGINE_NAME = 'general private offline'
TEST_ENGINES = [
{
'name': PRIVATE_ENGINE_NAME,
'engine': 'dummy-offline',
'categories': 'general',
'shortcut': 'do',
'timeout': 3.0,
'offline': True,
'tokens': ['my-token'],
},
]
SEARCHQUERY = [EngineRef(PRIVATE_ENGINE_NAME, 'general')]
class ValidateQueryCase(SearxTestCase):
@classmethod
def setUpClass(cls):
searx.engines.initialize_engines(TEST_ENGINES)
def test_query_private_engine_without_token(self):
preferences = Preferences(['oscar'], ['general'], engines, [])
valid, unknown, invalid_token = validate_engineref_list(SEARCHQUERY, preferences)
self.assertEqual(len(valid), 0)
self.assertEqual(len(unknown), 0)
self.assertEqual(len(invalid_token), 1)
def test_query_private_engine_with_incorrect_token(self):
preferences_with_tokens = Preferences(['oscar'], ['general'], engines, [])
preferences_with_tokens.parse_dict({'tokens': 'bad-token'})
valid, unknown, invalid_token = validate_engineref_list(SEARCHQUERY, preferences_with_tokens)
self.assertEqual(len(valid), 0)
self.assertEqual(len(unknown), 0)
self.assertEqual(len(invalid_token), 1)
def test_query_private_engine_with_correct_token(self):
preferences_with_tokens = Preferences(['oscar'], ['general'], engines, [])
preferences_with_tokens.parse_dict({'tokens': 'my-token'})
valid, unknown, invalid_token = validate_engineref_list(SEARCHQUERY, preferences_with_tokens)
self.assertEqual(len(valid), 1)
self.assertEqual(len(unknown), 0)
self.assertEqual(len(invalid_token), 0)

View file

@ -65,7 +65,7 @@ form = {
preferences = searx.preferences.Preferences(['oscar'], searx.engines.categories.keys(), searx.engines.engines, []) preferences = searx.preferences.Preferences(['oscar'], searx.engines.categories.keys(), searx.engines.engines, [])
preferences.key_value_settings['safesearch'].parse(args.safesearch) preferences.key_value_settings['safesearch'].parse(args.safesearch)
search_query, raw_text_query = searx.webadapter.get_search_query_from_webapp(preferences, form) search_query, raw_text_query, _, _ = searx.webadapter.get_search_query_from_webapp(preferences, form)
search = searx.search.Search(search_query) search = searx.search.Search(search_query)
result_container = search.search() result_container = search.search()