searxng/searx/botdetection/link_token.py
Markus Heiser b8c7c2c9aa [mod] botdetection - improve ip_limit and link_token methods
- counting requests in LONG_WINDOW and BURST_WINDOW is not needed when the
  request is validated by the link_token method [1]

- renew a ping-key on validation [2], this is needed for infinite scrolling,
  where no new token (CSS) is loaded. / this does not fix the BURST_MAX issue in
  the vanilla limiter

- normalize the counter names of the ip_limit method to 'ip_limit.*'

- just integrate the ip_limit method straight forward in the limiter plugin /
  non intermediate code --> ip_limit now returns None or a werkzeug.Response
  object that can be passed by the plugin to the flask application / non
  intermediate code that returns a tuple

[1] https://github.com/searxng/searxng/pull/2357#issuecomment-1566113277
[2] https://github.com/searxng/searxng/pull/2357#discussion_r1208542206
[3] https://github.com/searxng/searxng/pull/2357#issuecomment-1566125979

Signed-off-by: Markus Heiser <markus.heiser@darmarit.de>
2023-06-01 14:38:53 +02:00

148 lines
4.2 KiB
Python

# SPDX-License-Identifier: AGPL-3.0-or-later
# lint: pylint
"""
Method ``link_token``
---------------------
The ``link_token`` method evaluates a request as :py:obj:`suspicious
<is_suspicious>` if the URL ``/client<token>.css`` is not requested by the
client. By adding a random component (the token) in the URL a bot can not send
a ping by request a static URL.
.. note::
This method requires a redis DB and needs a HTTP X-Forwarded-For_ header.
To get in use of this method a flask URL route needs to be added:
.. code:: python
@app.route('/client<token>.css', methods=['GET', 'POST'])
def client_token(token=None):
link_token.ping(request, token)
return Response('', mimetype='text/css')
And in the HTML template from flask a stylesheet link is needed (the value of
``link_token`` comes from :py:obj:`get_token`):
.. code:: html
<link rel="stylesheet"
href="{{ url_for('client_token', token=link_token) }}"
type="text/css" />
.. _X-Forwarded-For:
https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/X-Forwarded-For
"""
import string
import random
import flask
from searx import logger
from searx import redisdb
from searx.redislib import secret_hash
TOKEN_LIVE_TIME = 600
"""Livetime (sec) of limiter's CSS token."""
PING_LIVE_TIME = 3600
"""Livetime (sec) of the ping-key from a client (request)"""
PING_KEY = 'SearXNG_limiter.ping'
"""Prefix of all ping-keys generated by :py:obj:`get_ping_key`"""
TOKEN_KEY = 'SearXNG_limiter.token'
"""Key for which the current token is stored in the DB"""
logger = logger.getChild('botdetection.link_token')
def is_suspicious(request: flask.Request, renew: bool = False):
"""Checks if there is a valid ping for this request, if not this request is
rated as *suspicious*. If a valid ping exists and argument ``renew`` is
``True`` the expire time of this ping is reset to :py:obj:`PING_LIVE_TIME`.
"""
redis_client = redisdb.client()
if not redis_client:
return False
ping_key = get_ping_key(request)
if not redis_client.get(ping_key):
logger.warning(
"missing ping (IP: %s) / request: %s",
request.headers.get('X-Forwarded-For', ''),
ping_key,
)
return True
if renew:
redis_client.set(ping_key, 1, ex=PING_LIVE_TIME)
logger.debug("found ping for client request: %s", ping_key)
return False
def ping(request: flask.Request, token: str):
"""This function is called by a request to URL ``/client<token>.css``. If
``token`` is valid a :py:obj:`PING_KEY` for the client is stored in the DB.
The expire time of this ping-key is :py:obj:`PING_LIVE_TIME`.
"""
redis_client = redisdb.client()
if not redis_client:
return
if not token_is_valid(token):
return
ping_key = get_ping_key(request)
logger.debug("store ping for: %s", ping_key)
redis_client.set(ping_key, 1, ex=PING_LIVE_TIME)
def get_ping_key(request: flask.Request):
"""Generates a hashed key that fits (more or less) to a client (request).
At least X-Forwarded-For_ is needed to be able to assign the request to an
IP.
"""
return (
PING_KEY
+ "["
+ secret_hash(
request.headers.get('X-Forwarded-For', '')
+ request.headers.get('Accept-Language', '')
+ request.headers.get('User-Agent', '')
)
+ "]"
)
def token_is_valid(token) -> bool:
valid = token == get_token()
logger.debug("token is valid --> %s", valid)
return valid
def get_token() -> str:
"""Returns current token. If there is no currently active token a new token
is generated randomly and stored in the redis DB.
- :py:obj:`TOKEN_LIVE_TIME`
- :py:obj:`TOKEN_KEY`
"""
redis_client = redisdb.client()
if not redis_client:
# This function is also called when limiter is inactive / no redis DB
# (see render function in webapp.py)
return '12345678'
token = redis_client.get(TOKEN_KEY)
if token:
token = token.decode('UTF-8')
else:
token = ''.join(random.choice(string.ascii_lowercase + string.digits) for _ in range(16))
redis_client.set(TOKEN_KEY, token, ex=TOKEN_LIVE_TIME)
return token