Linted with black

This commit is contained in:
YOGESHWARAN R 2021-05-18 09:11:02 +05:30
parent 167f551a96
commit 202db95f52
No known key found for this signature in database
GPG key ID: A43D3BDEF049D20D
10 changed files with 325 additions and 168 deletions

View file

@ -3,7 +3,10 @@ from appdirs import user_data_dir
# override polyglot path
import polyglot
polyglot.polyglot_path = os.path.join(user_data_dir(appname="LibreTranslate", appauthor="uav4geo"), "polyglot_data")
polyglot.polyglot_path = os.path.join(
user_data_dir(appname="LibreTranslate", appauthor="uav4geo"), "polyglot_data"
)
from .main import main

View file

@ -4,24 +4,29 @@ from expiringdict import ExpiringDict
DEFAULT_DB_PATH = "api_keys.db"
class Database:
def __init__(self, db_path = DEFAULT_DB_PATH, max_cache_len=1000, max_cache_age=30):
def __init__(self, db_path=DEFAULT_DB_PATH, max_cache_len=1000, max_cache_age=30):
self.db_path = db_path
self.cache = ExpiringDict(max_len=max_cache_len, max_age_seconds=max_cache_age)
# Make sure to do data synchronization on writes!
self.c = sqlite3.connect(db_path, check_same_thread=False)
self.c.execute('''CREATE TABLE IF NOT EXISTS api_keys (
self.c.execute(
"""CREATE TABLE IF NOT EXISTS api_keys (
"api_key" TEXT NOT NULL,
"req_limit" INTEGER NOT NULL,
PRIMARY KEY("api_key")
);''')
);"""
)
def lookup(self, api_key):
req_limit = self.cache.get(api_key)
if req_limit is None:
# DB Lookup
stmt = self.c.execute('SELECT req_limit FROM api_keys WHERE api_key = ?', (api_key, ))
stmt = self.c.execute(
"SELECT req_limit FROM api_keys WHERE api_key = ?", (api_key,)
)
row = stmt.fetchone()
if row is not None:
self.cache[api_key] = row[0]
@ -29,26 +34,29 @@ class Database:
else:
self.cache[api_key] = False
req_limit = False
if isinstance(req_limit, bool):
req_limit = None
return req_limit
def add(self, req_limit, api_key = "auto"):
def add(self, req_limit, api_key="auto"):
if api_key == "auto":
api_key = str(uuid.uuid4())
self.remove(api_key)
self.c.execute("INSERT INTO api_keys (api_key, req_limit) VALUES (?, ?)", (api_key, req_limit))
self.c.execute(
"INSERT INTO api_keys (api_key, req_limit) VALUES (?, ?)",
(api_key, req_limit),
)
self.c.commit()
return (api_key, req_limit)
def remove(self, api_key):
self.c.execute('DELETE FROM api_keys WHERE api_key = ?', (api_key, ))
self.c.execute("DELETE FROM api_keys WHERE api_key = ?", (api_key,))
self.c.commit()
return api_key
def all(self):
row = self.c.execute("SELECT api_key, req_limit FROM api_keys")
return row.fetchall()
return row.fetchall()

View file

@ -8,20 +8,23 @@ from app.language import detect_languages, transliterate
from app import flood
from functools import wraps
def get_json_dict(request):
d = request.get_json()
if not isinstance(d, dict):
abort(400, description="Invalid JSON format")
return d
def get_remote_address():
if request.headers.getlist("X-Forwarded-For"):
ip = request.headers.getlist("X-Forwarded-For")[0]
else:
ip = request.remote_addr or '127.0.0.1'
ip = request.remote_addr or "127.0.0.1"
return ip
def get_routes_limits(default_req_limit, daily_req_limit, api_keys_db):
if default_req_limit == -1:
# TODO: better way?
@ -33,7 +36,7 @@ def get_routes_limits(default_req_limit, daily_req_limit, api_keys_db):
if api_keys_db:
if request.is_json:
json = get_json_dict(request)
api_key = json.get('api_key')
api_key = json.get("api_key")
else:
api_key = request.values.get("api_key")
@ -47,53 +50,67 @@ def get_routes_limits(default_req_limit, daily_req_limit, api_keys_db):
res = [limits]
if daily_req_limit > 0:
res.append("%s per day" % daily_req_limit)
res.append("%s per day" % daily_req_limit)
return res
def create_app(args):
from app.init import boot
boot(args.load_only)
from app.language import languages
app = Flask(__name__)
if args.debug:
app.config['TEMPLATES_AUTO_RELOAD'] = True
app.config["TEMPLATES_AUTO_RELOAD"] = True
# Map userdefined frontend languages to argos language object.
if args.frontend_language_source == "auto":
frontend_argos_language_source = type('obj', (object,), {
'code': 'auto',
'name': 'Auto Detect'
})
frontend_argos_language_source = type(
"obj", (object,), {"code": "auto", "name": "Auto Detect"}
)
else:
frontend_argos_language_source = next(iter([l for l in languages if l.code == args.frontend_language_source]), None)
frontend_argos_language_source = next(
iter([l for l in languages if l.code == args.frontend_language_source]),
None,
)
frontend_argos_language_target = next(iter([l for l in languages if l.code == args.frontend_language_target]), None)
frontend_argos_language_target = next(
iter([l for l in languages if l.code == args.frontend_language_target]), None
)
# Raise AttributeError to prevent app startup if user input is not valid.
if frontend_argos_language_source is None:
raise AttributeError(f"{args.frontend_language_source} as frontend source language is not supported.")
raise AttributeError(
f"{args.frontend_language_source} as frontend source language is not supported."
)
if frontend_argos_language_target is None:
raise AttributeError(f"{args.frontend_language_target} as frontend target language is not supported.")
raise AttributeError(
f"{args.frontend_language_target} as frontend target language is not supported."
)
api_keys_db = None
if args.req_limit > 0 or args.api_keys or args.daily_req_limit > 0:
api_keys_db = Database() if args.api_keys else None
from flask_limiter import Limiter
limiter = Limiter(
app,
key_func=get_remote_address,
default_limits=get_routes_limits(args.req_limit, args.daily_req_limit, api_keys_db)
default_limits=get_routes_limits(
args.req_limit, args.daily_req_limit, api_keys_db
),
)
else:
from .no_limiter import Limiter
limiter = Limiter()
from .no_limiter import Limiter
limiter = Limiter()
if args.req_flood_threshold > 0:
flood.setup(args.req_flood_threshold)
@ -102,7 +119,7 @@ def create_app(args):
def func(*a, **kw):
if flood.is_banned(get_remote_address()):
abort(403, description="Too many request limits violations")
if args.api_keys and args.require_api_key_origin:
if request.is_json:
json = get_json_dict(request)
@ -110,10 +127,17 @@ def create_app(args):
else:
ak = request.values.get("api_key")
if api_keys_db.lookup(ak) is None and request.headers.get("Origin") != args.require_api_key_origin:
abort(403, description="Please contact the server operator to obtain an API key")
if (
api_keys_db.lookup(ak) is None
and request.headers.get("Origin") != args.require_api_key_origin
):
abort(
403,
description="Please contact the server operator to obtain an API key",
)
return f(*a, **kw)
return func
@app.errorhandler(400)
@ -133,18 +157,23 @@ def create_app(args):
def denied(e):
return jsonify({"error": str(e.description)}), 403
@app.route("/")
@limiter.exempt
def index():
return render_template('index.html', gaId=args.ga_id, frontendTimeout=args.frontend_timeout, api_keys=args.api_keys, web_version=os.environ.get('LT_WEB') is not None)
return render_template(
"index.html",
gaId=args.ga_id,
frontendTimeout=args.frontend_timeout,
api_keys=args.api_keys,
web_version=os.environ.get("LT_WEB") is not None,
)
@app.route("/javascript-licenses", methods=['GET'])
@app.route("/javascript-licenses", methods=["GET"])
@limiter.exempt
def javascript_licenses():
return render_template('javascript-licenses.html')
return render_template("javascript-licenses.html")
@app.route("/languages", methods=['GET', 'POST'])
@app.route("/languages", methods=["GET", "POST"])
@limiter.exempt
def langs():
"""
@ -177,21 +206,22 @@ def create_app(args):
type: string
description: Reason for slow down
"""
return jsonify([{'code': l.code, 'name': l.name} for l in languages])
return jsonify([{"code": l.code, "name": l.name} for l in languages])
# Add cors
@app.after_request
def after_request(response):
response.headers.add('Access-Control-Allow-Origin','*')
response.headers.add('Access-Control-Allow-Headers', "Authorization, Content-Type")
response.headers.add('Access-Control-Expose-Headers', "Authorization")
response.headers.add('Access-Control-Allow-Methods', "GET, POST")
response.headers.add('Access-Control-Allow-Credentials', "true")
response.headers.add('Access-Control-Max-Age', 60 * 60 * 24 * 20)
response.headers.add("Access-Control-Allow-Origin", "*")
response.headers.add(
"Access-Control-Allow-Headers", "Authorization, Content-Type"
)
response.headers.add("Access-Control-Expose-Headers", "Authorization")
response.headers.add("Access-Control-Allow-Methods", "GET, POST")
response.headers.add("Access-Control-Allow-Credentials", "true")
response.headers.add("Access-Control-Max-Age", 60 * 60 * 24 * 20)
return response
@app.route("/translate", methods=['POST'])
@app.route("/translate", methods=["POST"])
@access_check
def translate():
"""
@ -282,9 +312,9 @@ def create_app(args):
"""
if request.is_json:
json = get_json_dict(request)
q = json.get('q')
source_lang = json.get('source')
target_lang = json.get('target')
q = json.get("q")
source_lang = json.get("source")
target_lang = json.get("target")
else:
q = request.values.get("q")
source_lang = request.values.get("source")
@ -300,20 +330,28 @@ def create_app(args):
batch = isinstance(q, list)
if batch and args.batch_limit != -1:
batch_size = len(q)
if args.batch_limit < batch_size:
abort(400, description="Invalid request: Request (%d) exceeds text limit (%d)" % (batch_size, args.batch_limit))
batch_size = len(q)
if args.batch_limit < batch_size:
abort(
400,
description="Invalid request: Request (%d) exceeds text limit (%d)"
% (batch_size, args.batch_limit),
)
if args.char_limit != -1:
if batch:
chars = sum([len(text) for text in q])
chars = sum([len(text) for text in q])
else:
chars = len(q)
chars = len(q)
if args.char_limit < chars:
abort(400, description="Invalid request: Request (%d) exceeds character limit (%d)" % (chars, args.char_limit))
abort(
400,
description="Invalid request: Request (%d) exceeds character limit (%d)"
% (chars, args.char_limit),
)
if source_lang == 'auto':
if source_lang == "auto":
candidate_langs = detect_languages(q)
if args.debug:
@ -335,14 +373,29 @@ def create_app(args):
translator = src_lang.get_translation(tgt_lang)
try:
if batch:
return jsonify({"translatedText": [translator.translate(transliterate(text, target_lang=source_lang)) for text in q] })
else:
return jsonify({"translatedText": translator.translate(transliterate(q, target_lang=source_lang)) })
if batch:
return jsonify(
{
"translatedText": [
translator.translate(
transliterate(text, target_lang=source_lang)
)
for text in q
]
}
)
else:
return jsonify(
{
"translatedText": translator.translate(
transliterate(q, target_lang=source_lang)
)
}
)
except Exception as e:
abort(500, description="Cannot translate text: %s" % str(e))
@app.route("/detect", methods=['POST'])
@app.route("/detect", methods=["POST"])
@access_check
def detect():
"""
@ -393,7 +446,7 @@ def create_app(args):
properties:
error:
type: string
description: Error message
description: Error message
500:
description: Detection error
schema:
@ -427,7 +480,7 @@ def create_app(args):
if request.is_json:
json = get_json_dict(request)
q = json.get('q')
q = json.get("q")
else:
q = request.values.get("q")
@ -436,7 +489,6 @@ def create_app(args):
return jsonify(detect_languages(q))
@app.route("/frontend/settings")
@limiter.exempt
def frontend_settings():
@ -480,30 +532,37 @@ def create_app(args):
type: string
description: Human-readable language name (in English)
"""
return jsonify({'charLimit': args.char_limit,
'frontendTimeout': args.frontend_timeout,
'language': {
'source': {'code': frontend_argos_language_source.code, 'name': frontend_argos_language_source.name},
'target': {'code': frontend_argos_language_target.code, 'name': frontend_argos_language_target.name}}
})
return jsonify(
{
"charLimit": args.char_limit,
"frontendTimeout": args.frontend_timeout,
"language": {
"source": {
"code": frontend_argos_language_source.code,
"name": frontend_argos_language_source.name,
},
"target": {
"code": frontend_argos_language_target.code,
"name": frontend_argos_language_target.name,
},
},
}
)
swag = swagger(app)
swag['info']['version'] = "1.2"
swag['info']['title'] = "LibreTranslate"
swag["info"]["version"] = "1.2"
swag["info"]["title"] = "LibreTranslate"
@app.route("/spec")
@limiter.exempt
def spec():
return jsonify(swag)
SWAGGER_URL = '/docs' # URL for exposing Swagger UI (without trailing '/')
API_URL = '/spec'
SWAGGER_URL = "/docs" # URL for exposing Swagger UI (without trailing '/')
API_URL = "/spec"
# Call factory function to create our blueprint
swaggerui_blueprint = get_swaggerui_blueprint(
SWAGGER_URL,
API_URL
)
swaggerui_blueprint = get_swaggerui_blueprint(SWAGGER_URL, API_URL)
app.register_blueprint(swaggerui_blueprint)

View file

@ -7,11 +7,13 @@ banned = {}
active = False
threshold = -1
def clear_banned():
global banned
banned = {}
def setup(violations_threshold = 100):
def setup(violations_threshold=100):
global active
global threshold
@ -31,6 +33,7 @@ def report(request_ip):
banned[request_ip] = banned.get(request_ip, 0)
banned[request_ip] += 1
def is_banned(request_ip):
# More than X offences?
return active and banned.get(request_ip, 0) >= threshold

View file

@ -5,6 +5,7 @@ import os, glob, shutil, zipfile
import app.language
import polyglot
def boot(load_only=None):
try:
check_and_install_models(load_only_lang_codes=load_only)
@ -12,6 +13,7 @@ def boot(load_only=None):
except Exception as e:
print("Cannot update models (normal if you're offline): %s" % str(e))
def check_and_install_models(force=False, load_only_lang_codes=None):
if len(package.get_installed_packages()) < 2 or force:
# Update package definitions from remote
@ -29,7 +31,10 @@ def check_and_install_models(force=False, load_only_lang_codes=None):
for pack in available_packages:
unavailable_lang_codes -= {pack.from_code, pack.to_code}
if unavailable_lang_codes:
raise ValueError('Unavailable language codes: %s.' % ','.join(sorted(unavailable_lang_codes)))
raise ValueError(
"Unavailable language codes: %s."
% ",".join(sorted(unavailable_lang_codes))
)
# Keep only the packages that have both from_code and to_code in our list.
available_packages = [
pack
@ -38,39 +43,52 @@ def check_and_install_models(force=False, load_only_lang_codes=None):
and pack.to_code in load_only_lang_codes
]
if not available_packages:
raise ValueError('no available package')
raise ValueError("no available package")
print("Keep %s models" % len(available_packages))
# Download and install all available packages
for available_package in available_packages:
print("Downloading %s (%s) ..." % (available_package, available_package.package_version))
print(
"Downloading %s (%s) ..."
% (available_package, available_package.package_version)
)
download_path = available_package.download()
package.install_from_path(download_path)
# reload installed languages
app.language.languages = translate.load_installed_languages()
print("Loaded support for %s languages (%s models total)!" % (len(translate.load_installed_languages()), len(available_packages)))
print(
"Loaded support for %s languages (%s models total)!"
% (len(translate.load_installed_languages()), len(available_packages))
)
def check_and_install_transliteration(force=False):
# 'en' is not a supported transliteration language
transliteration_languages = [l.code for l in app.language.languages if l.code != "en"]
transliteration_languages = [
l.code for l in app.language.languages if l.code != "en"
]
# check installed
install_needed = []
if not force:
t_packages_path = Path(polyglot.polyglot_path) / "transliteration2"
for lang in transliteration_languages:
if not (t_packages_path / lang / f"transliteration.{lang}.tar.bz2").exists():
if not (
t_packages_path / lang / f"transliteration.{lang}.tar.bz2"
).exists():
install_needed.append(lang)
else:
install_needed = transliteration_languages
# install the needed transliteration packages
if install_needed:
print(f"Installing transliteration models for the following languages: {', '.join(install_needed)}")
print(
f"Installing transliteration models for the following languages: {', '.join(install_needed)}"
)
from polyglot.downloader import Downloader
downloader = Downloader()
for lang in install_needed:

View file

@ -31,17 +31,14 @@ def detect_languages(text):
read_bytes_total = sum(c.read_bytes for c in candidates)
# only use candidates that are supported by argostranslate
candidate_langs = list(filter(lambda l: l.read_bytes != 0 and l.code in __lang_codes, candidates))
candidate_langs = list(
filter(lambda l: l.read_bytes != 0 and l.code in __lang_codes, candidates)
)
# this happens if no language could be detected
if not candidate_langs:
# use language "en" by default but with zero confidence
return [
{
'confidence': 0.0,
'language': "en"
}
]
return [{"confidence": 0.0, "language": "en"}]
# for multiple occurrences of the same language (can happen on batch detection)
# calculate the average confidence for each language
@ -65,15 +62,11 @@ def detect_languages(text):
candidate_langs = temp_average_list
# sort the candidates descending based on the detected confidence
candidate_langs.sort(key=lambda l: (l.confidence * l.read_bytes) / read_bytes_total, reverse=True)
candidate_langs.sort(
key=lambda l: (l.confidence * l.read_bytes) / read_bytes_total, reverse=True
)
return [
{
'confidence': l.confidence,
'language': l.code
}
for l in candidate_langs
]
return [{"confidence": l.confidence, "language": l.code} for l in candidate_langs]
def __transliterate_line(transliterator, line_text):
@ -97,9 +90,9 @@ def __transliterate_line(transliterator, line_text):
else:
# add back any stripped punctuation
if r_diff:
t_word = t_word + ''.join(r_diff)
t_word = t_word + "".join(r_diff)
if l_diff:
t_word = ''.join(l_diff) + t_word
t_word = "".join(l_diff) + t_word
new_text.append(t_word)

View file

@ -2,42 +2,102 @@ import argparse
import operator
from app.app import create_app
def main():
parser = argparse.ArgumentParser(description='LibreTranslate - Free and Open Source Translation API')
parser.add_argument('--host', type=str,
help='Hostname (%(default)s)', default="127.0.0.1")
parser.add_argument('--port', type=int,
help='Port (%(default)s)', default=5000)
parser.add_argument('--char-limit', default=-1, type=int, metavar="<number of characters>",
help='Set character limit (%(default)s)')
parser.add_argument('--req-limit', default=-1, type=int, metavar="<number>",
help='Set the default maximum number of requests per minute per client (%(default)s)')
parser.add_argument('--daily-req-limit', default=-1, type=int, metavar="<number>",
help='Set the default maximum number of requests per day per client, in addition to req-limit. (%(default)s)')
parser.add_argument('--req-flood-threshold', default=-1, type=int, metavar="<number>",
help='Set the maximum number of request limit offences per 4 weeks that a client can exceed before being banned. (%(default)s)')
parser.add_argument('--batch-limit', default=-1, type=int, metavar="<number of texts>",
help='Set maximum number of texts to translate in a batch request (%(default)s)')
parser.add_argument('--ga-id', type=str, default=None, metavar="<GA ID>",
help='Enable Google Analytics on the API client page by providing an ID (%(default)s)')
parser.add_argument('--debug', default=False, action="store_true",
help="Enable debug environment")
parser.add_argument('--ssl', default=None, action="store_true",
help="Whether to enable SSL")
parser.add_argument('--frontend-language-source', type=str, default="en", metavar="<language code>",
help='Set frontend default language - source (%(default)s)')
parser.add_argument('--frontend-language-target', type=str, default="es", metavar="<language code>",
help='Set frontend default language - target (%(default)s)')
parser.add_argument('--frontend-timeout', type=int, default=500, metavar="<milliseconds>",
help='Set frontend translation timeout (%(default)s)')
parser.add_argument('--api-keys', default=False, action="store_true",
help="Enable API keys database for per-user rate limits lookup")
parser.add_argument('--require-api-key-origin', type=str, default="",
help="Require use of an API key for programmatic access to the API, unless the request origin matches this domain")
parser.add_argument('--load-only', type=operator.methodcaller('split', ','),
metavar='<comma-separated language codes>',
help='Set available languages (ar,de,en,es,fr,ga,hi,it,ja,ko,pt,ru,zh)')
parser = argparse.ArgumentParser(
description="LibreTranslate - Free and Open Source Translation API"
)
parser.add_argument(
"--host", type=str, help="Hostname (%(default)s)", default="127.0.0.1"
)
parser.add_argument("--port", type=int, help="Port (%(default)s)", default=5000)
parser.add_argument(
"--char-limit",
default=-1,
type=int,
metavar="<number of characters>",
help="Set character limit (%(default)s)",
)
parser.add_argument(
"--req-limit",
default=-1,
type=int,
metavar="<number>",
help="Set the default maximum number of requests per minute per client (%(default)s)",
)
parser.add_argument(
"--daily-req-limit",
default=-1,
type=int,
metavar="<number>",
help="Set the default maximum number of requests per day per client, in addition to req-limit. (%(default)s)",
)
parser.add_argument(
"--req-flood-threshold",
default=-1,
type=int,
metavar="<number>",
help="Set the maximum number of request limit offences per 4 weeks that a client can exceed before being banned. (%(default)s)",
)
parser.add_argument(
"--batch-limit",
default=-1,
type=int,
metavar="<number of texts>",
help="Set maximum number of texts to translate in a batch request (%(default)s)",
)
parser.add_argument(
"--ga-id",
type=str,
default=None,
metavar="<GA ID>",
help="Enable Google Analytics on the API client page by providing an ID (%(default)s)",
)
parser.add_argument(
"--debug", default=False, action="store_true", help="Enable debug environment"
)
parser.add_argument(
"--ssl", default=None, action="store_true", help="Whether to enable SSL"
)
parser.add_argument(
"--frontend-language-source",
type=str,
default="en",
metavar="<language code>",
help="Set frontend default language - source (%(default)s)",
)
parser.add_argument(
"--frontend-language-target",
type=str,
default="es",
metavar="<language code>",
help="Set frontend default language - target (%(default)s)",
)
parser.add_argument(
"--frontend-timeout",
type=int,
default=500,
metavar="<milliseconds>",
help="Set frontend translation timeout (%(default)s)",
)
parser.add_argument(
"--api-keys",
default=False,
action="store_true",
help="Enable API keys database for per-user rate limits lookup",
)
parser.add_argument(
"--require-api-key-origin",
type=str,
default="",
help="Require use of an API key for programmatic access to the API, unless the request origin matches this domain",
)
parser.add_argument(
"--load-only",
type=operator.methodcaller("split", ","),
metavar="<comma-separated language codes>",
help="Set available languages (ar,de,en,es,fr,ga,hi,it,ja,ko,pt,ru,zh)",
)
args = parser.parse_args()
app = create_app(args)
@ -46,7 +106,14 @@ def main():
app.run(host=args.host, port=args.port)
else:
from waitress import serve
serve(app, host=args.host, port=args.port, url_scheme='https' if args.ssl else 'http')
serve(
app,
host=args.host,
port=args.port,
url_scheme="https" if args.ssl else "http",
)
if __name__ == "__main__":
main()

View file

@ -1,31 +1,34 @@
import argparse
from app.api_keys import Database
def manage():
parser = argparse.ArgumentParser(description='LibreTranslate Manage Tools')
subparsers = parser.add_subparsers(help='', dest='command', required=True, title="Command List")
parser = argparse.ArgumentParser(description="LibreTranslate Manage Tools")
subparsers = parser.add_subparsers(
help="", dest="command", required=True, title="Command List"
)
keys_parser = subparsers.add_parser('keys', help='Manage API keys database')
keys_subparser = keys_parser.add_subparsers(help='', dest='sub_command', title="Command List")
keys_parser = subparsers.add_parser("keys", help="Manage API keys database")
keys_subparser = keys_parser.add_subparsers(
help="", dest="sub_command", title="Command List"
)
keys_add_parser = keys_subparser.add_parser('add', help='Add API keys to database')
keys_add_parser.add_argument('req_limit',
type=int,
help='Request Limits (per second)')
keys_add_parser.add_argument('--key',
type=str,
default="auto",
required=False,
help='API Key')
keys_add_parser = keys_subparser.add_parser("add", help="Add API keys to database")
keys_add_parser.add_argument(
"req_limit", type=int, help="Request Limits (per second)"
)
keys_add_parser.add_argument(
"--key", type=str, default="auto", required=False, help="API Key"
)
keys_remove_parser = keys_subparser.add_parser('remove', help='Remove API keys to database')
keys_remove_parser.add_argument('key',
type=str,
help='API Key')
keys_remove_parser = keys_subparser.add_parser(
"remove", help="Remove API keys to database"
)
keys_remove_parser.add_argument("key", type=str, help="API Key")
args = parser.parse_args()
if args.command == 'keys':
if args.command == "keys":
db = Database()
if args.sub_command is None:
# Print keys
@ -36,10 +39,10 @@ def manage():
for item in keys:
print("%s: %s" % item)
elif args.sub_command == 'add':
elif args.sub_command == "add":
print(db.add(args.req_limit, args.key)[0])
elif args.sub_command == 'remove':
elif args.sub_command == "remove":
print(db.remove(args.key))
else:
parser.print_help()
exit(1)
exit(1)

View file

@ -1,8 +1,10 @@
from functools import wraps
class Limiter:
def exempt(self, f):
@wraps(f)
def wrapper(*args, **kwargs):
return f(*args, **kwargs)
return wrapper
def exempt(self, f):
@wraps(f)
def wrapper(*args, **kwargs):
return f(*args, **kwargs)
return wrapper

View file

@ -2,8 +2,9 @@ import pytest
from app.init import boot
from argostranslate import package
def test_boot_argos():
"""Test Argos translate models initialization"""
boot()
assert len(package.get_installed_packages()) > 2
assert len(package.get_installed_packages()) > 2