Merge main

This commit is contained in:
Piero Toffanin 2021-06-03 10:39:41 -04:00
commit ca1049f0d3
13 changed files with 376 additions and 180 deletions

View file

@ -115,6 +115,7 @@ docker-compose up -d --build
| --frontend-language-target | Set frontend default language - target | `es` |
| --frontend-timeout | Set frontend translation timeout | `500` |
| --api-keys | Enable API keys database for per-user rate limits lookup | `Don't use API keys` |
| --require-api-key-origin | Require use of an API key for programmatic access to the API, unless the request origin matches this domain | `No restrictions on domain origin` |
| --load-only | Set available languages | `all from argostranslate` |
## Manage API Keys
@ -151,6 +152,7 @@ You can use the LibreTranslate API using the following bindings:
- Node.js: https://github.com/franciscop/translate
- .Net: https://github.com/sigaloid/LibreTranslate.Net
- Go: https://github.com/SnakeSel/libretranslate
- Python: https://github.com/argosopentech/LibreTranslate-py
More coming soon!

View file

@ -1,9 +1,12 @@
import os
from appdirs import user_data_dir
# override polyglot path
import polyglot
polyglot.polyglot_path = os.path.join(user_data_dir(appname="LibreTranslate", appauthor="uav4geo"), "polyglot_data")
from appdirs import user_data_dir
polyglot.polyglot_path = os.path.join(
user_data_dir(appname="LibreTranslate", appauthor="uav4geo"), "polyglot_data"
)
from .main import main

View file

@ -1,27 +1,33 @@
import sqlite3
import uuid
from expiringdict import ExpiringDict
DEFAULT_DB_PATH = "api_keys.db"
class Database:
def __init__(self, db_path = DEFAULT_DB_PATH, max_cache_len=1000, max_cache_age=30):
def __init__(self, db_path=DEFAULT_DB_PATH, max_cache_len=1000, max_cache_age=30):
self.db_path = db_path
self.cache = ExpiringDict(max_len=max_cache_len, max_age_seconds=max_cache_age)
# Make sure to do data synchronization on writes!
self.c = sqlite3.connect(db_path, check_same_thread=False)
self.c.execute('''CREATE TABLE IF NOT EXISTS api_keys (
self.c.execute(
"""CREATE TABLE IF NOT EXISTS api_keys (
"api_key" TEXT NOT NULL,
"req_limit" INTEGER NOT NULL,
PRIMARY KEY("api_key")
);''')
);"""
)
def lookup(self, api_key):
req_limit = self.cache.get(api_key)
if req_limit is None:
# DB Lookup
stmt = self.c.execute('SELECT req_limit FROM api_keys WHERE api_key = ?', (api_key, ))
stmt = self.c.execute(
"SELECT req_limit FROM api_keys WHERE api_key = ?", (api_key,)
)
row = stmt.fetchone()
if row is not None:
self.cache[api_key] = row[0]
@ -29,26 +35,29 @@ class Database:
else:
self.cache[api_key] = False
req_limit = False
if isinstance(req_limit, bool):
req_limit = None
return req_limit
def add(self, req_limit, api_key = "auto"):
def add(self, req_limit, api_key="auto"):
if api_key == "auto":
api_key = str(uuid.uuid4())
self.remove(api_key)
self.c.execute("INSERT INTO api_keys (api_key, req_limit) VALUES (?, ?)", (api_key, req_limit))
self.c.execute(
"INSERT INTO api_keys (api_key, req_limit) VALUES (?, ?)",
(api_key, req_limit),
)
self.c.commit()
return (api_key, req_limit)
def remove(self, api_key):
self.c.execute('DELETE FROM api_keys WHERE api_key = ?', (api_key, ))
self.c.execute("DELETE FROM api_keys WHERE api_key = ?", (api_key,))
self.c.commit()
return api_key
def all(self):
row = self.c.execute("SELECT api_key, req_limit FROM api_keys")
return row.fetchall()
return row.fetchall()

View file

@ -1,11 +1,15 @@
import os
from flask import Flask, render_template, jsonify, request, abort, send_from_directory
from functools import wraps
from flask import Flask, abort, jsonify, render_template, request
from flask_swagger import swagger
from flask_swagger_ui import get_swaggerui_blueprint
from pkg_resources import resource_filename
from .api_keys import Database
from app.language import detect_languages, transliterate
from app import flood
from app.language import detect_languages, transliterate
from .api_keys import Database
def get_json_dict(request):
d = request.get_json()
@ -13,11 +17,12 @@ def get_json_dict(request):
abort(400, description="Invalid JSON format")
return d
def get_remote_address():
if request.headers.getlist("X-Forwarded-For"):
ip = request.headers.getlist("X-Forwarded-For")[0]
else:
ip = request.remote_addr or '127.0.0.1'
ip = request.remote_addr or "127.0.0.1"
return ip
@ -58,47 +63,91 @@ def get_routes_limits(default_req_limit, daily_req_limit, api_keys_db):
return res
def create_app(args):
from app.init import boot
boot(args.load_only)
from app.language import languages
app = Flask(__name__)
if args.debug:
app.config['TEMPLATES_AUTO_RELOAD'] = True
app.config["TEMPLATES_AUTO_RELOAD"] = True
# Map userdefined frontend languages to argos language object.
if args.frontend_language_source == "auto":
frontend_argos_language_source = type('obj', (object,), {
'code': 'auto',
'name': 'Auto Detect'
})
frontend_argos_language_source = type(
"obj", (object,), {"code": "auto", "name": "Auto Detect"}
)
else:
frontend_argos_language_source = next(iter([l for l in languages if l.code == args.frontend_language_source]), None)
frontend_argos_language_source = next(
iter([l for l in languages if l.code == args.frontend_language_source]),
None,
)
frontend_argos_language_target = next(iter([l for l in languages if l.code == args.frontend_language_target]), None)
frontend_argos_language_target = next(
iter([l for l in languages if l.code == args.frontend_language_target]), None
)
# Raise AttributeError to prevent app startup if user input is not valid.
if frontend_argos_language_source is None:
raise AttributeError(f"{args.frontend_language_source} as frontend source language is not supported.")
raise AttributeError(
f"{args.frontend_language_source} as frontend source language is not supported."
)
if frontend_argos_language_target is None:
raise AttributeError(f"{args.frontend_language_target} as frontend target language is not supported.")
raise AttributeError(
f"{args.frontend_language_target} as frontend target language is not supported."
)
api_keys_db = None
if args.req_limit > 0 or args.api_keys or args.daily_req_limit > 0:
api_keys_db = Database() if args.api_keys else None
from flask_limiter import Limiter
limiter = Limiter(
app,
key_func=get_remote_address,
default_limits=get_routes_limits(args.req_limit, args.daily_req_limit, Database() if args.api_keys else None)
default_limits=get_routes_limits(
args.req_limit, args.daily_req_limit, api_keys_db
),
)
else:
from .no_limiter import Limiter
limiter = Limiter()
from .no_limiter import Limiter
limiter = Limiter()
if args.req_flood_threshold > 0:
flood.setup(args.req_flood_threshold)
def access_check(f):
@wraps(f)
def func(*a, **kw):
if flood.is_banned(get_remote_address()):
abort(403, description="Too many request limits violations")
if args.api_keys and args.require_api_key_origin:
if request.is_json:
json = get_json_dict(request)
ak = json.get("api_key")
else:
ak = request.values.get("api_key")
if (
api_keys_db.lookup(ak) is None and request.headers.get("Origin") != args.require_api_key_origin
):
abort(
403,
description="Please contact the server operator to obtain an API key",
)
return f(*a, **kw)
return func
@app.errorhandler(400)
def invalid_api(e):
return jsonify({"error": str(e.description)}), 400
@ -116,18 +165,23 @@ def create_app(args):
def denied(e):
return jsonify({"error": str(e.description)}), 403
@app.route("/")
@limiter.exempt
def index():
return render_template('index.html', gaId=args.ga_id, frontendTimeout=args.frontend_timeout, api_keys=args.api_keys, web_version=os.environ.get('LT_WEB') is not None)
return render_template(
"index.html",
gaId=args.ga_id,
frontendTimeout=args.frontend_timeout,
api_keys=args.api_keys,
web_version=os.environ.get("LT_WEB") is not None,
)
@app.route("/javascript-licenses", methods=['GET'])
@app.route("/javascript-licenses", methods=["GET"])
@limiter.exempt
def javascript_licenses():
return render_template('javascript-licenses.html')
return render_template("javascript-licenses.html")
@app.route("/languages", methods=['GET', 'POST'])
@app.route("/languages", methods=["GET", "POST"])
@limiter.exempt
def langs():
"""
@ -160,21 +214,23 @@ def create_app(args):
type: string
description: Reason for slow down
"""
return jsonify([{'code': l.code, 'name': l.name} for l in languages])
return jsonify([{"code": l.code, "name": l.name} for l in languages])
# Add cors
@app.after_request
def after_request(response):
response.headers.add('Access-Control-Allow-Origin','*')
response.headers.add('Access-Control-Allow-Headers', "Authorization, Content-Type")
response.headers.add('Access-Control-Expose-Headers', "Authorization")
response.headers.add('Access-Control-Allow-Methods', "GET, POST")
response.headers.add('Access-Control-Allow-Credentials', "true")
response.headers.add('Access-Control-Max-Age', 60 * 60 * 24 * 20)
response.headers.add("Access-Control-Allow-Origin", "*")
response.headers.add(
"Access-Control-Allow-Headers", "Authorization, Content-Type"
)
response.headers.add("Access-Control-Expose-Headers", "Authorization")
response.headers.add("Access-Control-Allow-Methods", "GET, POST")
response.headers.add("Access-Control-Allow-Credentials", "true")
response.headers.add("Access-Control-Max-Age", 60 * 60 * 24 * 20)
return response
@app.route("/translate", methods=['POST'])
@app.route("/translate", methods=["POST"])
@access_check
def translate():
"""
Translate text from a language to another
@ -262,14 +318,11 @@ def create_app(args):
type: string
description: Error message
"""
if flood.is_banned(get_remote_address()):
abort(403, description="Too many request limits violations")
if request.is_json:
json = get_json_dict(request)
q = json.get('q')
source_lang = json.get('source')
target_lang = json.get('target')
q = json.get("q")
source_lang = json.get("source")
target_lang = json.get("target")
else:
q = request.values.get("q")
source_lang = request.values.get("source")
@ -285,20 +338,28 @@ def create_app(args):
batch = isinstance(q, list)
if batch and args.batch_limit != -1:
batch_size = len(q)
if args.batch_limit < batch_size:
abort(400, description="Invalid request: Request (%d) exceeds text limit (%d)" % (batch_size, args.batch_limit))
batch_size = len(q)
if args.batch_limit < batch_size:
abort(
400,
description="Invalid request: Request (%d) exceeds text limit (%d)"
% (batch_size, args.batch_limit),
)
if args.char_limit != -1:
if batch:
chars = sum([len(text) for text in q])
chars = sum([len(text) for text in q])
else:
chars = len(q)
chars = len(q)
if args.char_limit < chars:
abort(400, description="Invalid request: Request (%d) exceeds character limit (%d)" % (chars, args.char_limit))
abort(
400,
description="Invalid request: Request (%d) exceeds character limit (%d)"
% (chars, args.char_limit),
)
if source_lang == 'auto':
if source_lang == "auto":
candidate_langs = detect_languages(q)
if args.debug:
@ -320,14 +381,30 @@ def create_app(args):
translator = src_lang.get_translation(tgt_lang)
try:
if batch:
return jsonify({"translatedText": [translator.translate(transliterate(text, target_lang=source_lang)) for text in q] })
else:
return jsonify({"translatedText": translator.translate(transliterate(q, target_lang=source_lang)) })
if batch:
return jsonify(
{
"translatedText": [
translator.translate(
transliterate(text, target_lang=source_lang)
)
for text in q
]
}
)
else:
return jsonify(
{
"translatedText": translator.translate(
transliterate(q, target_lang=source_lang)
)
}
)
except Exception as e:
abort(500, description="Cannot translate text: %s" % str(e))
@app.route("/detect", methods=['POST'])
@app.route("/detect", methods=["POST"])
@access_check
def detect():
"""
Detect the language of a single text
@ -377,7 +454,7 @@ def create_app(args):
properties:
error:
type: string
description: Error message
description: Error message
500:
description: Detection error
schema:
@ -411,7 +488,7 @@ def create_app(args):
if request.is_json:
json = get_json_dict(request)
q = json.get('q')
q = json.get("q")
else:
q = request.values.get("q")
@ -420,7 +497,6 @@ def create_app(args):
return jsonify(detect_languages(q))
@app.route("/frontend/settings")
@limiter.exempt
def frontend_settings():
@ -464,30 +540,37 @@ def create_app(args):
type: string
description: Human-readable language name (in English)
"""
return jsonify({'charLimit': args.char_limit,
'frontendTimeout': args.frontend_timeout,
'language': {
'source': {'code': frontend_argos_language_source.code, 'name': frontend_argos_language_source.name},
'target': {'code': frontend_argos_language_target.code, 'name': frontend_argos_language_target.name}}
})
return jsonify(
{
"charLimit": args.char_limit,
"frontendTimeout": args.frontend_timeout,
"language": {
"source": {
"code": frontend_argos_language_source.code,
"name": frontend_argos_language_source.name,
},
"target": {
"code": frontend_argos_language_target.code,
"name": frontend_argos_language_target.name,
},
},
}
)
swag = swagger(app)
swag['info']['version'] = "1.2"
swag['info']['title'] = "LibreTranslate"
swag["info"]["version"] = "1.2"
swag["info"]["title"] = "LibreTranslate"
@app.route("/spec")
@limiter.exempt
def spec():
return jsonify(swag)
SWAGGER_URL = '/docs' # URL for exposing Swagger UI (without trailing '/')
API_URL = '/spec'
SWAGGER_URL = "/docs" # URL for exposing Swagger UI (without trailing '/')
API_URL = "/spec"
# Call factory function to create our blueprint
swaggerui_blueprint = get_swaggerui_blueprint(
SWAGGER_URL,
API_URL
)
swaggerui_blueprint = get_swaggerui_blueprint(SWAGGER_URL, API_URL)
app.register_blueprint(swaggerui_blueprint)

View file

@ -1,4 +1,3 @@
import time
import atexit
from apscheduler.schedulers.background import BackgroundScheduler
@ -7,11 +6,13 @@ banned = {}
active = False
threshold = -1
def clear_banned():
global banned
banned = {}
def setup(violations_threshold = 100):
def setup(violations_threshold=100):
global active
global threshold
@ -31,6 +32,7 @@ def report(request_ip):
banned[request_ip] = banned.get(request_ip, 0)
banned[request_ip] += 1
def is_banned(request_ip):
# More than X offences?
return active and banned.get(request_ip, 0) >= threshold

View file

@ -1,9 +1,10 @@
import os
from pathlib import Path
from argostranslate import settings, package, translate
import os, glob, shutil, zipfile
import app.language
import polyglot
from argostranslate import package, translate
import app.language
def boot(load_only=None):
try:
@ -12,6 +13,7 @@ def boot(load_only=None):
except Exception as e:
print("Cannot update models (normal if you're offline): %s" % str(e))
def check_and_install_models(force=False, load_only_lang_codes=None):
if len(package.get_installed_packages()) < 2 or force:
# Update package definitions from remote
@ -29,48 +31,63 @@ def check_and_install_models(force=False, load_only_lang_codes=None):
for pack in available_packages:
unavailable_lang_codes -= {pack.from_code, pack.to_code}
if unavailable_lang_codes:
raise ValueError('Unavailable language codes: %s.' % ','.join(sorted(unavailable_lang_codes)))
raise ValueError(
"Unavailable language codes: %s."
% ",".join(sorted(unavailable_lang_codes))
)
# Keep only the packages that have both from_code and to_code in our list.
available_packages = [
pack
for pack in available_packages
if pack.from_code in load_only_lang_codes
and pack.to_code in load_only_lang_codes
if pack.from_code in load_only_lang_codes and pack.to_code in load_only_lang_codes
]
if not available_packages:
raise ValueError('no available package')
raise ValueError("no available package")
print("Keep %s models" % len(available_packages))
# Download and install all available packages
for available_package in available_packages:
print("Downloading %s (%s) ..." % (available_package, available_package.package_version))
print(
"Downloading %s (%s) ..."
% (available_package, available_package.package_version)
)
download_path = available_package.download()
package.install_from_path(download_path)
# reload installed languages
app.language.languages = translate.load_installed_languages()
print("Loaded support for %s languages (%s models total)!" % (len(translate.load_installed_languages()), len(available_packages)))
print(
"Loaded support for %s languages (%s models total)!"
% (len(translate.load_installed_languages()), len(available_packages))
)
def check_and_install_transliteration(force=False):
# 'en' is not a supported transliteration language
transliteration_languages = [l.code for l in app.language.languages if l.code != "en"]
transliteration_languages = [
l.code for l in app.language.languages if l.code != "en"
]
# check installed
install_needed = []
if not force:
t_packages_path = Path(polyglot.polyglot_path) / "transliteration2"
for lang in transliteration_languages:
if not (t_packages_path / lang / f"transliteration.{lang}.tar.bz2").exists():
if not (
t_packages_path / lang / f"transliteration.{lang}.tar.bz2"
).exists():
install_needed.append(lang)
else:
install_needed = transliteration_languages
# install the needed transliteration packages
if install_needed:
print(f"Installing transliteration models for the following languages: {', '.join(install_needed)}")
print(
f"Installing transliteration models for the following languages: {', '.join(install_needed)}"
)
from polyglot.downloader import Downloader
downloader = Downloader()
for lang in install_needed:

View file

@ -4,7 +4,6 @@ from argostranslate import translate
from polyglot.detect.base import Detector, UnknownLanguage
from polyglot.transliteration.base import Transliterator
languages = translate.load_installed_languages()
@ -24,24 +23,21 @@ def detect_languages(text):
for t in text:
try:
candidates.extend(Detector(t).languages)
except UnknownLanguage as e:
except UnknownLanguage:
pass
# total read bytes of the provided text
read_bytes_total = sum(c.read_bytes for c in candidates)
# only use candidates that are supported by argostranslate
candidate_langs = list(filter(lambda l: l.read_bytes != 0 and l.code in __lang_codes, candidates))
candidate_langs = list(
filter(lambda l: l.read_bytes != 0 and l.code in __lang_codes, candidates)
)
# this happens if no language could be detected
if not candidate_langs:
# use language "en" by default but with zero confidence
return [
{
'confidence': 0.0,
'language': "en"
}
]
return [{"confidence": 0.0, "language": "en"}]
# for multiple occurrences of the same language (can happen on batch detection)
# calculate the average confidence for each language
@ -65,15 +61,11 @@ def detect_languages(text):
candidate_langs = temp_average_list
# sort the candidates descending based on the detected confidence
candidate_langs.sort(key=lambda l: (l.confidence * l.read_bytes) / read_bytes_total, reverse=True)
candidate_langs.sort(
key=lambda l: (l.confidence * l.read_bytes) / read_bytes_total, reverse=True
)
return [
{
'confidence': l.confidence,
'language': l.code
}
for l in candidate_langs
]
return [{"confidence": l.confidence, "language": l.code} for l in candidate_langs]
def __transliterate_line(transliterator, line_text):
@ -97,9 +89,9 @@ def __transliterate_line(transliterator, line_text):
else:
# add back any stripped punctuation
if r_diff:
t_word = t_word + ''.join(r_diff)
t_word = t_word + "".join(r_diff)
if l_diff:
t_word = ''.join(l_diff) + t_word
t_word = "".join(l_diff) + t_word
new_text.append(t_word)

View file

@ -1,41 +1,104 @@
import argparse
import operator
from app.app import create_app
def main():
parser = argparse.ArgumentParser(description='LibreTranslate - Free and Open Source Translation API')
parser.add_argument('--host', type=str,
help='Hostname (%(default)s)', default="127.0.0.1")
parser.add_argument('--port', type=int,
help='Port (%(default)s)', default=5000)
parser.add_argument('--char-limit', default=-1, type=int, metavar="<number of characters>",
help='Set character limit (%(default)s)')
parser.add_argument('--req-limit', default=-1, type=int, metavar="<number>",
help='Set the default maximum number of requests per minute per client (%(default)s)')
parser.add_argument('--daily-req-limit', default=-1, type=int, metavar="<number>",
help='Set the default maximum number of requests per day per client, in addition to req-limit. (%(default)s)')
parser.add_argument('--req-flood-threshold', default=-1, type=int, metavar="<number>",
help='Set the maximum number of request limit offences per 4 weeks that a client can exceed before being banned. (%(default)s)')
parser.add_argument('--batch-limit', default=-1, type=int, metavar="<number of texts>",
help='Set maximum number of texts to translate in a batch request (%(default)s)')
parser.add_argument('--ga-id', type=str, default=None, metavar="<GA ID>",
help='Enable Google Analytics on the API client page by providing an ID (%(default)s)')
parser.add_argument('--debug', default=False, action="store_true",
help="Enable debug environment")
parser.add_argument('--ssl', default=None, action="store_true",
help="Whether to enable SSL")
parser.add_argument('--frontend-language-source', type=str, default="en", metavar="<language code>",
help='Set frontend default language - source (%(default)s)')
parser.add_argument('--frontend-language-target', type=str, default="es", metavar="<language code>",
help='Set frontend default language - target (%(default)s)')
parser.add_argument('--frontend-timeout', type=int, default=500, metavar="<milliseconds>",
help='Set frontend translation timeout (%(default)s)')
parser.add_argument('--api-keys', default=False, action="store_true",
help="Enable API keys database for per-user rate limits lookup")
parser.add_argument('--load-only', type=operator.methodcaller('split', ','),
metavar='<comma-separated language codes>',
help='Set available languages (ar,de,en,es,fr,ga,hi,it,ja,ko,pt,ru,zh)')
parser = argparse.ArgumentParser(
description="LibreTranslate - Free and Open Source Translation API"
)
parser.add_argument(
"--host", type=str, help="Hostname (%(default)s)", default="127.0.0.1"
)
parser.add_argument("--port", type=int, help="Port (%(default)s)", default=5000)
parser.add_argument(
"--char-limit",
default=-1,
type=int,
metavar="<number of characters>",
help="Set character limit (%(default)s)",
)
parser.add_argument(
"--req-limit",
default=-1,
type=int,
metavar="<number>",
help="Set the default maximum number of requests per minute per client (%(default)s)",
)
parser.add_argument(
"--daily-req-limit",
default=-1,
type=int,
metavar="<number>",
help="Set the default maximum number of requests per day per client, in addition to req-limit. (%(default)s)",
)
parser.add_argument(
"--req-flood-threshold",
default=-1,
type=int,
metavar="<number>",
help="Set the maximum number of request limit offences per 4 weeks that a client can exceed before being banned. (%(default)s)",
)
parser.add_argument(
"--batch-limit",
default=-1,
type=int,
metavar="<number of texts>",
help="Set maximum number of texts to translate in a batch request (%(default)s)",
)
parser.add_argument(
"--ga-id",
type=str,
default=None,
metavar="<GA ID>",
help="Enable Google Analytics on the API client page by providing an ID (%(default)s)",
)
parser.add_argument(
"--debug", default=False, action="store_true", help="Enable debug environment"
)
parser.add_argument(
"--ssl", default=None, action="store_true", help="Whether to enable SSL"
)
parser.add_argument(
"--frontend-language-source",
type=str,
default="en",
metavar="<language code>",
help="Set frontend default language - source (%(default)s)",
)
parser.add_argument(
"--frontend-language-target",
type=str,
default="es",
metavar="<language code>",
help="Set frontend default language - target (%(default)s)",
)
parser.add_argument(
"--frontend-timeout",
type=int,
default=500,
metavar="<milliseconds>",
help="Set frontend translation timeout (%(default)s)",
)
parser.add_argument(
"--api-keys",
default=False,
action="store_true",
help="Enable API keys database for per-user rate limits lookup",
)
parser.add_argument(
"--require-api-key-origin",
type=str,
default="",
help="Require use of an API key for programmatic access to the API, unless the request origin matches this domain",
)
parser.add_argument(
"--load-only",
type=operator.methodcaller("split", ","),
metavar="<comma-separated language codes>",
help="Set available languages (ar,de,en,es,fr,ga,hi,it,ja,ko,pt,ru,zh)",
)
args = parser.parse_args()
app = create_app(args)
@ -44,7 +107,14 @@ def main():
app.run(host=args.host, port=args.port)
else:
from waitress import serve
serve(app, host=args.host, port=args.port, url_scheme='https' if args.ssl else 'http')
serve(
app,
host=args.host,
port=args.port,
url_scheme="https" if args.ssl else "http",
)
if __name__ == "__main__":
main()

View file

@ -1,31 +1,35 @@
import argparse
from app.api_keys import Database
def manage():
parser = argparse.ArgumentParser(description='LibreTranslate Manage Tools')
subparsers = parser.add_subparsers(help='', dest='command', required=True, title="Command List")
parser = argparse.ArgumentParser(description="LibreTranslate Manage Tools")
subparsers = parser.add_subparsers(
help="", dest="command", required=True, title="Command List"
)
keys_parser = subparsers.add_parser('keys', help='Manage API keys database')
keys_subparser = keys_parser.add_subparsers(help='', dest='sub_command', title="Command List")
keys_parser = subparsers.add_parser("keys", help="Manage API keys database")
keys_subparser = keys_parser.add_subparsers(
help="", dest="sub_command", title="Command List"
)
keys_add_parser = keys_subparser.add_parser('add', help='Add API keys to database')
keys_add_parser.add_argument('req_limit',
type=int,
help='Request Limits (per second)')
keys_add_parser.add_argument('--key',
type=str,
default="auto",
required=False,
help='API Key')
keys_add_parser = keys_subparser.add_parser("add", help="Add API keys to database")
keys_add_parser.add_argument(
"req_limit", type=int, help="Request Limits (per second)"
)
keys_add_parser.add_argument(
"--key", type=str, default="auto", required=False, help="API Key"
)
keys_remove_parser = keys_subparser.add_parser('remove', help='Remove API keys to database')
keys_remove_parser.add_argument('key',
type=str,
help='API Key')
keys_remove_parser = keys_subparser.add_parser(
"remove", help="Remove API keys to database"
)
keys_remove_parser.add_argument("key", type=str, help="API Key")
args = parser.parse_args()
if args.command == 'keys':
if args.command == "keys":
db = Database()
if args.sub_command is None:
# Print keys
@ -36,10 +40,10 @@ def manage():
for item in keys:
print("%s: %s" % item)
elif args.sub_command == 'add':
elif args.sub_command == "add":
print(db.add(args.req_limit, args.key)[0])
elif args.sub_command == 'remove':
elif args.sub_command == "remove":
print(db.remove(args.key))
else:
parser.print_help()
exit(1)
exit(1)

View file

@ -1,8 +1,10 @@
from functools import wraps
class Limiter:
def exempt(self, f):
@wraps(f)
def wrapper(*args, **kwargs):
return f(*args, **kwargs)
return wrapper
def exempt(self, f):
@wraps(f)
def wrapper(*args, **kwargs):
return f(*args, **kwargs)
return wrapper

View file

@ -3,5 +3,5 @@
from app.init import check_and_install_models, check_and_install_transliteration
if __name__ == "__main__":
check_and_install_models(force=True)
check_and_install_transliteration(force=True)
check_and_install_models(force=True)
check_and_install_transliteration(force=True)

12
setup.cfg Normal file
View file

@ -0,0 +1,12 @@
[flake8]
exclude = .git,
.vscode,
.gitignore,
README.md,
venv,
test,
setup.py,
app/__init__.py
max-line-length = 136
ignore = E741

View file

@ -1,9 +1,9 @@
import pytest
from app.init import boot
from argostranslate import package
def test_boot_argos():
"""Test Argos translate models initialization"""
boot()
assert len(package.get_installed_packages()) > 2
assert len(package.get_installed_packages()) > 2