Merge pull request #139 from return42/refactor-webapp

[coding-style] searx/webapp.py - normalize indentations
This commit is contained in:
Markus Heiser 2021-06-13 09:19:27 +00:00 committed by GitHub
commit 2449ea70b5
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -157,10 +157,11 @@ app.secret_key = settings['server']['secret_key']
# see https://flask.palletsprojects.com/en/1.1.x/cli/
# True if "FLASK_APP=searx/webapp.py FLASK_ENV=development flask run"
flask_run_development = \
os.environ.get("FLASK_APP") is not None\
and os.environ.get("FLASK_ENV") == 'development'\
flask_run_development = (
os.environ.get("FLASK_APP") is not None
and os.environ.get("FLASK_ENV") == 'development'
and is_flask_run_cmdline()
)
# True if reload feature is activated of werkzeug, False otherwise (including uwsgi, etc..)
# __name__ != "__main__" if searx.webapp is imported (make test, make docs, uwsgi...)
@ -168,28 +169,33 @@ flask_run_development = \
werkzeug_reloader = flask_run_development or (searx_debug and __name__ == "__main__")
# initialize the engines except on the first run of the werkzeug server.
if not werkzeug_reloader\
or (werkzeug_reloader and os.environ.get("WERKZEUG_RUN_MAIN") == "true"):
if (not werkzeug_reloader
or (werkzeug_reloader
and os.environ.get("WERKZEUG_RUN_MAIN") == "true") ):
search_initialize(enable_checker=True)
babel = Babel(app)
rtl_locales = ['ar', 'arc', 'bcc', 'bqi', 'ckb', 'dv', 'fa', 'fa_IR', 'glk', 'he',
'ku', 'mzn', 'pnb', 'ps', 'sd', 'ug', 'ur', 'yi']
rtl_locales = [
'ar', 'arc', 'bcc', 'bqi', 'ckb', 'dv', 'fa', 'fa_IR', 'glk', 'he',
'ku', 'mzn', 'pnb', 'ps', 'sd', 'ug', 'ur', 'yi'
]
ui_locale_codes = [l.replace('_', '-') for l in settings['locales'].keys()]
# used when translating category names
_category_names = (gettext('files'),
gettext('general'),
gettext('music'),
gettext('social media'),
gettext('images'),
gettext('videos'),
gettext('it'),
gettext('news'),
gettext('map'),
gettext('onions'),
gettext('science'))
_category_names = (
gettext('files'),
gettext('general'),
gettext('music'),
gettext('social media'),
gettext('images'),
gettext('videos'),
gettext('it'),
gettext('news'),
gettext('map'),
gettext('onions'),
gettext('science')
)
#
timeout_text = gettext('timeout')
@ -230,7 +236,6 @@ def _get_translations():
if has_request_context() and request.form.get('use-translation') == 'oc':
babel_ext = flask_babel.current_app.extensions['babel']
return Translations.load(next(babel_ext.translation_directories), 'oc')
return _flask_babel_get_translations()
@ -309,9 +314,9 @@ def code_highlighter(codelines, language=None):
last_line + 1 != line:
# highlight last codepart
formatter = HtmlFormatter(linenos='inline',
linenostart=line_code_start,
cssclass="code-highlight")
formatter = HtmlFormatter(
linenos='inline', linenostart=line_code_start, cssclass="code-highlight"
)
html_code = html_code + highlight(tmp_code, lexer, formatter)
# reset conditions for next codepart
@ -374,12 +379,16 @@ def proxify(url):
url_params = dict(mortyurl=url.encode())
if settings['result_proxy'].get('key'):
url_params['mortyhash'] = hmac.new(settings['result_proxy']['key'],
url.encode(),
hashlib.sha256).hexdigest()
url_params['mortyhash'] = hmac.new(
settings['result_proxy']['key'],
url.encode(),
hashlib.sha256
).hexdigest()
return '{0}?{1}'.format(settings['result_proxy']['url'],
urlencode(url_params))
return '{0}?{1}'.format(
settings['result_proxy']['url'],
urlencode(url_params)
)
def image_proxify(url):
@ -423,12 +432,12 @@ def _get_ordered_categories():
def _get_enable_categories(all_categories):
disabled_engines = request.preferences.engines.get_disabled()
enabled_categories = set(category for engine_name in engines
for category in engines[engine_name].categories
if (engine_name, category) not in disabled_engines)
return [x for x in
all_categories
if x in enabled_categories]
enabled_categories = set(
category for engine_name in engines
for category in engines[engine_name].categories
if (engine_name, category) not in disabled_engines
)
return [x for x in all_categories if x in enabled_categories]
def render(template_name, override_theme=None, **kwargs):
@ -456,13 +465,13 @@ def render(template_name, override_theme=None, **kwargs):
if locale in rtl_locales and 'rtl' not in kwargs:
kwargs['rtl'] = True
if 'current_language' not in kwargs:
kwargs['current_language'] = match_language(request.preferences.get_value('language'),
LANGUAGE_CODES)
kwargs['current_language'] = match_language(
request.preferences.get_value('language'), LANGUAGE_CODES )
# values from settings
kwargs['search_formats'] = [
x for x in settings['search']['formats']
if x != 'html']
x for x in settings['search']['formats'] if x != 'html'
]
# brand
kwargs['instance_name'] = settings['general']['instance_name']
@ -475,8 +484,11 @@ def render(template_name, override_theme=None, **kwargs):
kwargs['proxify'] = proxify if settings.get('result_proxy', {}).get('url') else None
kwargs['proxify_results'] = settings.get('result_proxy', {}).get('proxify_results', True)
kwargs['get_result_template'] = get_result_template
kwargs['opensearch_url'] = url_for('opensearch') + '?' \
kwargs['opensearch_url'] = (
url_for('opensearch')
+ '?'
+ urlencode({'method': kwargs['method'], 'autocomplete': kwargs['autocomplete']})
)
# scripts from plugins
kwargs['scripts'] = set()
@ -567,10 +579,14 @@ def post_request(response):
'render;dur=' + str(round(request.render_time * 1000, 3))]
if len(request.timings) > 0:
timings = sorted(request.timings, key=lambda v: v['total'])
timings_total = ['total_' + str(i) + '_' + v['engine'] +
';dur=' + str(round(v['total'] * 1000, 3)) for i, v in enumerate(timings)]
timings_load = ['load_' + str(i) + '_' + v['engine'] +
';dur=' + str(round(v['load'] * 1000, 3)) for i, v in enumerate(timings) if v.get('load')]
timings_total = [
'total_' + str(i) + '_' + v['engine'] + ';dur=' + str(round(v['total'] * 1000, 3))
for i, v in enumerate(timings)
]
timings_load = [
'load_' + str(i) + '_' + v['engine'] + ';dur=' + str(round(v['load'] * 1000, 3))
for i, v in enumerate(timings) if v.get('load')
]
timings_all = timings_all + timings_total + timings_load
response.headers.add('Server-Timing', ', '.join(timings_all))
return response
@ -578,8 +594,10 @@ def post_request(response):
def index_error(output_format, error_message):
if output_format == 'json':
return Response(json.dumps({'error': error_message}),
mimetype='application/json')
return Response(
json.dumps({'error': error_message}),
mimetype='application/json'
)
if output_format == 'csv':
response = Response('', mimetype='application/csv')
cont_disp = 'attachment;Filename=searx.csv'
@ -651,7 +669,9 @@ def search():
raw_text_query = None
result_container = None
try:
search_query, raw_text_query, _, _ = get_search_query_from_webapp(request.preferences, request.form)
search_query, raw_text_query, _, _ = get_search_query_from_webapp(
request.preferences, request.form
)
# search = Search(search_query) # without plugins
search = SearchWithPlugins(search_query, request.user_plugins, request) # pylint: disable=redefined-outer-name
@ -715,22 +735,20 @@ def search():
result['publishedDate'] = format_date(result['publishedDate'])
if output_format == 'json':
return Response(
json.dumps(
{
'query': search_query.query,
'number_of_results': number_of_results,
'results': results,
'answers': list(result_container.answers),
'corrections': list(result_container.corrections),
'infoboxes': result_container.infoboxes,
'suggestions': list(result_container.suggestions),
'unresponsive_engines': __get_translated_errors(result_container.unresponsive_engines)
},
default = lambda item: list(item) if isinstance(item, set) else item
),
mimetype='application/json'
x = {
'query': search_query.query,
'number_of_results': number_of_results,
'results': results,
'answers': list(result_container.answers),
'corrections': list(result_container.corrections),
'infoboxes': result_container.infoboxes,
'suggestions': list(result_container.suggestions),
'unresponsive_engines': __get_translated_errors(result_container.unresponsive_engines)
}
response = json.dumps(
x, default = lambda item: list(item) if isinstance(item, set) else item
)
return Response(response, mimetype='application/json')
if output_format == 'csv':
csv = UnicodeWriter(StringIO())
@ -771,47 +789,59 @@ def search():
# HTML output format
# suggestions: use RawTextQuery to get the suggestion URLs with the same bang
suggestion_urls = list(map(lambda suggestion: {
'url': raw_text_query.changeQuery(suggestion).getFullQuery(),
'title': suggestion
},
result_container.suggestions))
suggestion_urls = list(
map(
lambda suggestion: {
'url': raw_text_query.changeQuery(suggestion).getFullQuery(),
'title': suggestion
},
result_container.suggestions
))
correction_urls = list(
map(
lambda correction: {
'url': raw_text_query.changeQuery(correction).getFullQuery(),
'title': correction
},
result_container.corrections
))
correction_urls = list(map(lambda correction: {
'url': raw_text_query.changeQuery(correction).getFullQuery(),
'title': correction
},
result_container.corrections))
#
return render(
'results.html',
results=results,
results = results,
q=request.form['q'],
selected_categories=search_query.categories,
pageno=search_query.pageno,
time_range=search_query.time_range,
number_of_results=format_decimal(number_of_results),
suggestions=suggestion_urls,
answers=result_container.answers,
corrections=correction_urls,
infoboxes=result_container.infoboxes,
engine_data=result_container.engine_data,
paging=result_container.paging,
unresponsive_engines=__get_translated_errors(result_container.unresponsive_engines),
current_language=match_language(search_query.lang,
LANGUAGE_CODES,
fallback=request.preferences.get_value("language")),
theme=get_current_theme_name(),
favicons=global_favicons[themes.index(get_current_theme_name())],
timeout_limit=request.form.get('timeout_limit', None)
selected_categories = search_query.categories,
pageno = search_query.pageno,
time_range = search_query.time_range,
number_of_results = format_decimal(number_of_results),
suggestions = suggestion_urls,
answers = result_container.answers,
corrections = correction_urls,
infoboxes = result_container.infoboxes,
engine_data = result_container.engine_data,
paging = result_container.paging,
unresponsive_engines = __get_translated_errors(
result_container.unresponsive_engines
),
current_language = match_language(
search_query.lang,
LANGUAGE_CODES,
fallback=request.preferences.get_value("language")
),
theme = get_current_theme_name(),
favicons = global_favicons[themes.index(get_current_theme_name())],
timeout_limit = request.form.get('timeout_limit', None)
)
def __get_translated_errors(unresponsive_engines):
translated_errors = []
# make a copy unresponsive_engines to avoid "RuntimeError: Set changed size during iteration"
# it happens when an engine modifies the ResultContainer after the search_multiple_requests method
# has stopped waiting
# make a copy unresponsive_engines to avoid "RuntimeError: Set changed size
# during iteration" it happens when an engine modifies the ResultContainer
# after the search_multiple_requests method has stopped waiting
for unresponsive_engine in list(unresponsive_engines):
error_user_text = exception_classname_to_text.get(unresponsive_engine[1])
if not error_user_text:
@ -822,15 +852,14 @@ def __get_translated_errors(unresponsive_engines):
if unresponsive_engine[3]:
error_msg = gettext('Suspended') + ': ' + error_msg
translated_errors.append((unresponsive_engine[0], error_msg))
return sorted(translated_errors, key=lambda e: e[0])
@app.route('/about', methods=['GET'])
def about():
"""Render about page"""
return render(
'about.html',
)
return render('about.html')
@app.route('/autocompleter', methods=['GET', 'POST'])
@ -910,7 +939,12 @@ def preferences():
allowed_plugins = request.preferences.plugins.get_enabled()
# stats for preferences page
filtered_engines = dict(filter(lambda kv: (kv[0], request.preferences.validate_token(kv[1])), engines.items()))
filtered_engines = dict(
filter(
lambda kv: (kv[0], request.preferences.validate_token(kv[1])),
engines.items()
)
)
engines_by_category = {}
for c in categories:
@ -1004,38 +1038,49 @@ def preferences():
'time_range_support': time_range_support,
}
#
return render('preferences.html',
selected_categories=get_selected_categories(request.preferences, request.form),
locales=settings['locales'],
current_locale=request.preferences.get_value("locale"),
image_proxy=image_proxy,
engines_by_category=engines_by_category,
stats=stats,
max_rate95=max_rate95,
reliabilities=reliabilities,
supports=supports,
answerers=[{'info': a.self_info(), 'keywords': a.keywords} for a in answerers],
disabled_engines=disabled_engines,
autocomplete_backends=autocomplete_backends,
shortcuts={y: x for x, y in engine_shortcuts.items()},
themes=themes,
plugins=plugins,
doi_resolvers=settings['doi_resolvers'],
current_doi_resolver=get_doi_resolver(request.args, request.preferences.get_value('doi_resolver')),
allowed_plugins=allowed_plugins,
theme=get_current_theme_name(),
preferences_url_params=request.preferences.get_as_url_params(),
locked_preferences=settings['preferences']['lock'],
preferences=True)
return render(
'preferences.html',
selected_categories = get_selected_categories(request.preferences, request.form),
locales = settings['locales'],
current_locale = request.preferences.get_value("locale"),
image_proxy = image_proxy,
engines_by_category = engines_by_category,
stats = stats,
max_rate95 = max_rate95,
reliabilities = reliabilities,
supports = supports,
answerers = [
{'info': a.self_info(), 'keywords': a.keywords}
for a in answerers
],
disabled_engines = disabled_engines,
autocomplete_backends = autocomplete_backends,
shortcuts = {y: x for x, y in engine_shortcuts.items()},
themes = themes,
plugins = plugins,
doi_resolvers = settings['doi_resolvers'],
current_doi_resolver = get_doi_resolver(
request.args, request.preferences.get_value('doi_resolver')
),
allowed_plugins = allowed_plugins,
theme = get_current_theme_name(),
preferences_url_params = request.preferences.get_as_url_params(),
locked_preferences = settings['preferences']['lock'],
preferences = True
)
def _is_selected_language_supported(engine, preferences): # pylint: disable=redefined-outer-name
language = preferences.get_value('language')
return (language == 'all'
or match_language(language,
getattr(engine, 'supported_languages', []),
getattr(engine, 'language_aliases', {}), None))
if language == 'all':
return True
x = match_language(
language,
getattr(engine, 'supported_languages', []),
getattr(engine, 'language_aliases', {}),
None
)
return bool(x)
@app.route('/image_proxy', methods=['GET'])
@ -1043,12 +1088,10 @@ def image_proxy():
# pylint: disable=too-many-return-statements
url = request.args.get('url')
if not url:
return '', 400
h = new_hmac(settings['server']['secret_key'], url.encode())
if h != request.args.get('h'):
return '', 400
@ -1058,32 +1101,41 @@ def image_proxy():
headers = dict_subset(request.headers, {'If-Modified-Since', 'If-None-Match'})
headers['User-Agent'] = gen_useragent()
stream = http_stream(
method='GET',
url=url,
headers=headers,
timeout=settings['outgoing']['request_timeout'],
allow_redirects=True,
max_redirects=20)
method = 'GET',
url = url,
headers = headers,
timeout = settings['outgoing']['request_timeout'],
allow_redirects = True,
max_redirects = 20
)
resp = next(stream)
content_length = resp.headers.get('Content-Length')
if content_length and content_length.isdigit() and int(content_length) > maximum_size:
if (content_length
and content_length.isdigit()
and int(content_length) > maximum_size ):
return 'Max size', 400
if resp.status_code == 304:
return '', resp.status_code
if resp.status_code != 200:
logger.debug('image-proxy: wrong response code: {0}'.format(resp.status_code))
logger.debug(
'image-proxy: wrong response code: {0}'.format(
resp.status_code))
if resp.status_code >= 400:
return '', resp.status_code
return '', 400
if not resp.headers.get('content-type', '').startswith('image/'):
logger.debug('image-proxy: wrong content-type: {0}'.format(resp.headers.get('content-type')))
logger.debug(
'image-proxy: wrong content-type: {0}'.format(
resp.headers.get('content-type')))
return '', 400
headers = dict_subset(resp.headers, {'Content-Length', 'Length', 'Date', 'Last-Modified', 'Expires', 'Etag'})
headers = dict_subset(
resp.headers,
{'Content-Length', 'Length', 'Date', 'Last-Modified', 'Expires', 'Etag'}
)
total_length = 0
@ -1106,7 +1158,11 @@ def stats():
sort_order = request.args.get('sort', default='name', type=str)
selected_engine_name = request.args.get('engine', default=None, type=str)
filtered_engines = dict(filter(lambda kv: (kv[0], request.preferences.validate_token(kv[1])), engines.items()))
filtered_engines = dict(
filter(
lambda kv: (kv[0], request.preferences.validate_token(kv[1])),
engines.items()
))
if selected_engine_name:
if selected_engine_name not in filtered_engines:
selected_engine_name = None
@ -1114,8 +1170,10 @@ def stats():
filtered_engines = [selected_engine_name]
checker_results = checker_get_result()
checker_results = checker_results['engines'] \
checker_results = (
checker_results['engines']
if checker_results['status'] == 'ok' and 'engines' in checker_results else {}
)
engine_stats = get_engines_stats(filtered_engines)
engine_reliabilities = get_reliabilities(filtered_engines, checker_results)
@ -1140,16 +1198,20 @@ def stats():
engine_stats['time'] = sorted(engine_stats['time'], reverse=reverse, key=get_key)
return render(
'stats.html',
sort_order=sort_order,
engine_stats=engine_stats,
engine_reliabilities=engine_reliabilities,
selected_engine_name=selected_engine_name,
sort_order = sort_order,
engine_stats = engine_stats,
engine_reliabilities = engine_reliabilities,
selected_engine_name = selected_engine_name,
)
@app.route('/stats/errors', methods=['GET'])
def stats_errors():
filtered_engines = dict(filter(lambda kv: (kv[0], request.preferences.validate_token(kv[1])), engines.items()))
filtered_engines = dict(
filter(
lambda kv: (kv[0], request.preferences.validate_token(kv[1])),
engines.items()
))
result = get_engine_errors(filtered_engines)
return jsonify(result)
@ -1188,9 +1250,11 @@ def opensearch():
override_theme='__common__'
)
resp = Response(response=ret,
status=200,
mimetype="application/opensearchdescription+xml")
resp = Response(
response = ret,
status = 200,
mimetype = "application/opensearchdescription+xml"
)
return resp
@ -1202,7 +1266,8 @@ def favicon():
settings['ui']['static_path'],
'themes',
get_current_theme_name(),
'img'),
'img'
),
'favicon.png',
mimetype = 'image/vnd.microsoft.icon'
)
@ -1271,14 +1336,18 @@ def page_not_found(_e):
def run():
logger.debug('starting webserver on %s:%s', settings['server']['bind_address'], settings['server']['port'])
logger.debug(
'starting webserver on %s:%s',
settings['server']['bind_address'],
settings['server']['port']
)
app.run(
debug=searx_debug,
use_debugger=searx_debug,
port=settings['server']['port'],
host=settings['server']['bind_address'],
threaded=True,
extra_files=[
debug = searx_debug,
use_debugger = searx_debug,
port = settings['server']['port'],
host = settings['server']['bind_address'],
threaded = True,
extra_files = [
get_default_settings_path()
],
)