import io import math import os import re import tempfile import uuid from datetime import datetime from functools import wraps from html import unescape from timeit import default_timer import argostranslatefiles from argostranslatefiles import get_supported_formats from flask import Blueprint, Flask, Response, abort, jsonify, render_template, request, send_file, session, url_for from flask_babel import Babel from flask_session import Session from flask_swagger import swagger from flask_swagger_ui import get_swaggerui_blueprint from translatehtml import translate_html from werkzeug.exceptions import HTTPException from werkzeug.http import http_date from werkzeug.utils import secure_filename from libretranslate import flood, remove_translated_files, scheduler, secret, security, storage from libretranslate.language import detect_languages, improve_translation_formatting from libretranslate.locales import ( _, _lazy, get_alternate_locale_links, get_available_locale_codes, get_available_locales, gettext_escaped, gettext_html, lazy_swag, ) from .api_keys import Database, RemoteDatabase from .suggestions import Database as SuggestionsDatabase def get_version(): try: with open("VERSION") as f: return f.read().strip() except: return "?" def get_upload_dir(): upload_dir = os.path.join(tempfile.gettempdir(), "libretranslate-files-translate") if not os.path.isdir(upload_dir): os.mkdir(upload_dir) return upload_dir def get_req_api_key(): if request.is_json: json = get_json_dict(request) ak = json.get("api_key") else: ak = request.values.get("api_key") return ak def get_req_secret(): if request.is_json: json = get_json_dict(request) ak = json.get("secret") else: ak = request.values.get("secret") return ak def get_json_dict(request): d = request.get_json() if not isinstance(d, dict): abort(400, description=_("Invalid JSON format")) return d def get_remote_address(): if request.headers.getlist("X-Forwarded-For"): ip = request.headers.getlist("X-Forwarded-For")[0].split(",")[0] else: ip = request.remote_addr or "127.0.0.1" return ip def get_req_limits(default_limit, api_keys_db, db_multiplier=1, multiplier=1): req_limit = default_limit if api_keys_db: api_key = get_req_api_key() if api_key: api_key_limits = api_keys_db.lookup(api_key) if api_key_limits is not None: req_limit = api_key_limits[0] * db_multiplier return int(req_limit * multiplier) def get_char_limit(default_limit, api_keys_db): char_limit = default_limit if api_keys_db: api_key = get_req_api_key() if api_key: api_key_limits = api_keys_db.lookup(api_key) if api_key_limits is not None: if api_key_limits[1] is not None: char_limit = api_key_limits[1] return char_limit def get_routes_limits(args, api_keys_db): default_req_limit = args.req_limit if default_req_limit == -1: # TODO: better way? default_req_limit = 9999999999999 def minute_limits(): return "%s per minute" % get_req_limits(default_req_limit, api_keys_db) def hourly_limits(n): def func(): decay = (0.75 ** (n - 1)) return "{} per {} hour".format(get_req_limits(args.hourly_req_limit * n, api_keys_db, int(os.environ.get("LT_HOURLY_REQ_LIMIT_MULTIPLIER", 60) * n), decay), n) return func def daily_limits(): return "%s per day" % get_req_limits(args.daily_req_limit, api_keys_db, int(os.environ.get("LT_DAILY_REQ_LIMIT_MULTIPLIER", 1440))) res = [minute_limits] if args.hourly_req_limit > 0: for n in range(1, args.hourly_req_limit_decay + 2): res.append(hourly_limits(n)) if args.daily_req_limit > 0: res.append(daily_limits) return res def create_app(args): from libretranslate.init import boot boot(args.load_only, args.update_models, args.force_update_models) from libretranslate.language import load_languages swagger_url = args.url_prefix + "/docs" # Swagger UI (w/o trailing '/') api_url = args.url_prefix + "/spec" bp = Blueprint('Main app', __name__) storage.setup(args.shared_storage) if not args.disable_files_translation: remove_translated_files.setup(get_upload_dir()) languages = load_languages() language_pairs = {} for lang in languages: language_pairs[lang.code] = sorted([l.to_lang.code for l in lang.translations_from]) # Map userdefined frontend languages to argos language object. if args.frontend_language_source == "auto": frontend_argos_language_source = type( "obj", (object,), {"code": "auto", "name": _("Auto Detect")} ) else: frontend_argos_language_source = next( iter([l for l in languages if l.code == args.frontend_language_source]), None, ) if frontend_argos_language_source is None: frontend_argos_language_source = languages[0] language_target_fallback = languages[1] if len(languages) >= 2 else languages[0] if args.frontend_language_target == "locale": def resolve_language_locale(): loc = get_locale() language_target = next( iter([l for l in languages if l.code == loc]), None ) if language_target is None: language_target = language_target_fallback return language_target frontend_argos_language_target = resolve_language_locale else: language_target = next( iter([l for l in languages if l.code == args.frontend_language_target]), None ) if language_target is None: language_target = language_target_fallback frontend_argos_language_target = lambda: language_target frontend_argos_supported_files_format = [] for file_format in get_supported_formats(): for ff in file_format.supported_file_extensions: frontend_argos_supported_files_format.append(ff) api_keys_db = None if args.req_limit > 0 or args.api_keys or args.daily_req_limit > 0 or args.hourly_req_limit > 0: api_keys_db = None if args.api_keys: api_keys_db = RemoteDatabase(args.api_keys_remote) if args.api_keys_remote else Database(args.api_keys_db_path) from flask_limiter import Limiter def limits_cost(): req_cost = getattr(request, 'req_cost', 1) if args.req_time_cost > 0: return max(req_cost, int(math.ceil(getattr(request, 'duration', 0) / args.req_time_cost))) else: return req_cost limiter = Limiter( key_func=get_remote_address, default_limits=get_routes_limits( args, api_keys_db ), storage_uri=args.req_limit_storage, default_limits_deduct_when=lambda req: True, # Force cost to be called after the request default_limits_cost=limits_cost ) else: from .no_limiter import Limiter limiter = Limiter() if not "gunicorn" in os.environ.get("SERVER_SOFTWARE", ""): # Gunicorn starts the scheduler in the master process scheduler.setup(args) flood.setup(args) secret.setup(args) measure_request = None gauge_request = None if args.metrics: if os.environ.get("PROMETHEUS_MULTIPROC_DIR") is None: default_mp_dir = os.path.abspath(os.path.join("db", "prometheus")) if not os.path.isdir(default_mp_dir): os.mkdir(default_mp_dir) os.environ["PROMETHEUS_MULTIPROC_DIR"] = default_mp_dir from prometheus_client import CONTENT_TYPE_LATEST, CollectorRegistry, Gauge, Summary, generate_latest, multiprocess @bp.route("/metrics") @limiter.exempt def prometheus_metrics(): if args.metrics_auth_token: authorization = request.headers.get('Authorization') if authorization != "Bearer " + args.metrics_auth_token: abort(401, description=_("Unauthorized")) registry = CollectorRegistry() multiprocess.MultiProcessCollector(registry) return Response(generate_latest(registry), mimetype=CONTENT_TYPE_LATEST) measure_request = Summary('libretranslate_http_request_duration_seconds', 'Time spent on request', ['endpoint', 'status', 'request_ip', 'api_key']) measure_request.labels('/translate', 200, '127.0.0.1', '') gauge_request = Gauge('libretranslate_http_requests_in_flight', 'Active requests', ['endpoint', 'request_ip', 'api_key'], multiprocess_mode='livesum') gauge_request.labels('/translate', '127.0.0.1', '') def access_check(f): @wraps(f) def func(*a, **kw): ip = get_remote_address() if flood.is_banned(ip): abort(403, description=_("Too many request limits violations")) if args.api_keys: ak = get_req_api_key() if ak and api_keys_db.lookup(ak) is None: abort( 403, description=_("Invalid API key"), ) else: need_key = False key_missing = api_keys_db.lookup(ak) is None if (args.require_api_key_origin and key_missing and not re.match(args.require_api_key_origin, request.headers.get("Origin", "")) ): need_key = True if (args.require_api_key_secret and key_missing and not secret.secret_match(get_req_secret()) ): need_key = True if need_key: description = _("Please contact the server operator to get an API key") if args.get_api_key_link: description = _("Visit %(url)s to get an API key", url=args.get_api_key_link) abort( 400, description=description, ) return f(*a, **kw) if args.metrics: @wraps(func) def measure_func(*a, **kw): start_t = default_timer() status = 200 ip = get_remote_address() ak = get_req_api_key() or '' g = gauge_request.labels(request.path, ip, ak) try: g.inc() return func(*a, **kw) except HTTPException as e: status = e.code raise e finally: request.duration = max(default_timer() - start_t, 0) measure_request.labels(request.path, status, ip, ak).observe(request.duration) g.dec() return measure_func else: @wraps(func) def time_func(*a, **kw): start_t = default_timer() try: return func(*a, **kw) finally: request.duration = max(default_timer() - start_t, 0) return time_func @bp.errorhandler(400) def invalid_api(e): return jsonify({"error": str(e.description)}), 400 @bp.errorhandler(500) def server_error(e): return jsonify({"error": str(e.description)}), 500 @bp.errorhandler(429) def slow_down_error(e): flood.report(get_remote_address()) return jsonify({"error": _("Slowdown:") + " " + str(e.description)}), 429 @bp.errorhandler(403) def denied(e): return jsonify({"error": str(e.description)}), 403 @bp.route("/") @limiter.exempt def index(): if args.disable_web_ui: abort(404) langcode = request.args.get('lang') if langcode and langcode in get_available_locale_codes(not args.debug): session.update(preferred_lang=langcode) return render_template( "index.html", gaId=args.ga_id, frontendTimeout=args.frontend_timeout, api_keys=args.api_keys, get_api_key_link=args.get_api_key_link, web_version=os.environ.get("LT_WEB") is not None, version=get_version(), swagger_url=swagger_url, available_locales=[{'code': l['code'], 'name': _lazy(l['name'])} for l in get_available_locales(not args.debug)], current_locale=get_locale(), alternate_locales=get_alternate_locale_links() ) @bp.route("/js/app.js") @limiter.exempt def appjs(): if args.disable_web_ui: abort(404) response = Response(render_template("app.js.template", url_prefix=args.url_prefix, get_api_key_link=args.get_api_key_link, api_secret=secret.get_current_secret() if args.require_api_key_secret else ""), content_type='application/javascript; charset=utf-8') if args.require_api_key_secret: response.headers['Last-Modified'] = http_date(datetime.now()) response.headers['Cache-Control'] = 'no-store, no-cache, must-revalidate, post-check=0, pre-check=0, max-age=0' response.headers['Pragma'] = 'no-cache' response.headers['Expires'] = '-1' return response @bp.get("/languages") @limiter.exempt def langs(): """ Retrieve list of supported languages --- tags: - translate responses: 200: description: List of languages schema: id: languages type: array items: type: object properties: code: type: string description: Language code name: type: string description: Human-readable language name (in English) targets: type: array items: type: string description: Supported target language codes """ return jsonify([{"code": l.code, "name": _lazy(l.name), "targets": language_pairs.get(l.code, [])} for l in languages]) # Add cors @bp.after_request def after_request(response): response.headers.add("Access-Control-Allow-Origin", "*") response.headers.add( "Access-Control-Allow-Headers", "Authorization, Content-Type" ) response.headers.add("Access-Control-Expose-Headers", "Authorization") response.headers.add("Access-Control-Allow-Methods", "GET, POST") response.headers.add("Access-Control-Allow-Credentials", "true") response.headers.add("Access-Control-Max-Age", 60 * 60 * 24 * 20) return response @bp.post("/translate") @access_check def translate(): """ Translate text from a language to another --- tags: - translate parameters: - in: formData name: q schema: oneOf: - type: string example: Hello world! - type: array example: ['Hello world!'] required: true description: Text(s) to translate - in: formData name: source schema: type: string example: en required: true description: Source language code - in: formData name: target schema: type: string example: es required: true description: Target language code - in: formData name: format schema: type: string enum: [text, html] default: text example: text required: false description: > Format of source text: * `text` - Plain text * `html` - HTML markup - in: formData name: api_key schema: type: string example: xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx required: false description: API key responses: 200: description: Translated text schema: id: translate type: object properties: translatedText: oneOf: - type: string - type: array description: Translated text(s) 400: description: Invalid request schema: id: error-response type: object properties: error: type: string description: Error message 500: description: Translation error schema: id: error-response type: object properties: error: type: string description: Error message 429: description: Slow down schema: id: error-slow-down type: object properties: error: type: string description: Reason for slow down 403: description: Banned schema: id: error-response type: object properties: error: type: string description: Error message """ if request.is_json: json = get_json_dict(request) q = json.get("q") source_lang = json.get("source") target_lang = json.get("target") text_format = json.get("format") else: q = request.values.get("q") source_lang = request.values.get("source") target_lang = request.values.get("target") text_format = request.values.get("format") if not q: abort(400, description=_("Invalid request: missing %(name)s parameter", name='q')) if not source_lang: abort(400, description=_("Invalid request: missing %(name)s parameter", name='source')) if not target_lang: abort(400, description=_("Invalid request: missing %(name)s parameter", name='target')) if not request.is_json: # Normalize line endings to UNIX style (LF) only so we can consistently # enforce character limits. # https://www.rfc-editor.org/rfc/rfc2046#section-4.1.1 q = "\n".join(q.splitlines()) char_limit = get_char_limit(args.char_limit, api_keys_db) batch = isinstance(q, list) if batch and args.batch_limit != -1: batch_size = len(q) if args.batch_limit < batch_size: abort( 400, description=_("Invalid request: request (%(size)s) exceeds text limit (%(limit)s)", size=batch_size, limit=args.batch_limit), ) src_texts = q if batch else [q] if char_limit != -1: for text in src_texts: if len(text) > char_limit: abort( 400, description=_("Invalid request: request (%(size)s) exceeds text limit (%(limit)s)", size=len(text), limit=char_limit), ) if batch: request.req_cost = max(1, len(q)) if source_lang == "auto": candidate_langs = detect_languages(src_texts) detected_src_lang = candidate_langs[0] else: detected_src_lang = {"confidence": 100.0, "language": source_lang} src_lang = next(iter([l for l in languages if l.code == detected_src_lang["language"]]), None) if src_lang is None: abort(400, description=_("%(lang)s is not supported", lang=source_lang)) tgt_lang = next(iter([l for l in languages if l.code == target_lang]), None) if tgt_lang is None: abort(400, description=_("%(lang)s is not supported",lang=target_lang)) if not text_format: text_format = "text" if text_format not in ["text", "html"]: abort(400, description=_("%(format)s format is not supported", format=text_format)) try: if batch: results = [] for text in q: translator = src_lang.get_translation(tgt_lang) if translator is None: abort(400, description=_("%(tname)s (%(tcode)s) is not available as a target language from %(sname)s (%(scode)s)", tname=_lazy(tgt_lang.name), tcode=tgt_lang.code, sname=_lazy(src_lang.name), scode=src_lang.code)) if text_format == "html": translated_text = str(translate_html(translator, text)) else: translated_text = improve_translation_formatting(text, translator.translate(text)) results.append(unescape(translated_text)) if source_lang == "auto": return jsonify( { "translatedText": results, "detectedLanguage": [detected_src_lang] * len(q) } ) else: return jsonify( { "translatedText": results } ) else: translator = src_lang.get_translation(tgt_lang) if translator is None: abort(400, description=_("%(tname)s (%(tcode)s) is not available as a target language from %(sname)s (%(scode)s)", tname=_lazy(tgt_lang.name), tcode=tgt_lang.code, sname=_lazy(src_lang.name), scode=src_lang.code)) if text_format == "html": translated_text = str(translate_html(translator, q)) else: translated_text = improve_translation_formatting(q, translator.translate(q)) if source_lang == "auto": return jsonify( { "translatedText": unescape(translated_text), "detectedLanguage": detected_src_lang } ) else: return jsonify( { "translatedText": unescape(translated_text) } ) except Exception as e: raise e abort(500, description=_("Cannot translate text: %(text)s", text=str(e))) @bp.post("/translate_file") @access_check def translate_file(): """ Translate file from a language to another --- tags: - translate consumes: - multipart/form-data parameters: - in: formData name: file type: file required: true description: File to translate - in: formData name: source schema: type: string example: en required: true description: Source language code - in: formData name: target schema: type: string example: es required: true description: Target language code - in: formData name: api_key schema: type: string example: xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx required: false description: API key responses: 200: description: Translated file schema: id: translate-file type: object properties: translatedFileUrl: type: string description: Translated file url 400: description: Invalid request schema: id: error-response type: object properties: error: type: string description: Error message 500: description: Translation error schema: id: error-response type: object properties: error: type: string description: Error message 429: description: Slow down schema: id: error-slow-down type: object properties: error: type: string description: Reason for slow down 403: description: Banned schema: id: error-response type: object properties: error: type: string description: Error message """ if args.disable_files_translation: abort(403, description=_("Files translation are disabled on this server.")) source_lang = request.form.get("source") target_lang = request.form.get("target") file = request.files['file'] char_limit = get_char_limit(args.char_limit, api_keys_db) if not file: abort(400, description=_("Invalid request: missing %(name)s parameter", name='file')) if not source_lang: abort(400, description=_("Invalid request: missing %(name)s parameter", name='source')) if not target_lang: abort(400, description=_("Invalid request: missing %(name)s parameter", name='target')) if file.filename == '': abort(400, description=_("Invalid request: empty file")) if os.path.splitext(file.filename)[1] not in frontend_argos_supported_files_format: abort(400, description=_("Invalid request: file format not supported")) src_lang = next(iter([l for l in languages if l.code == source_lang]), None) if src_lang is None: abort(400, description=_("%(lang)s is not supported", lang=source_lang)) tgt_lang = next(iter([l for l in languages if l.code == target_lang]), None) if tgt_lang is None: abort(400, description=_("%(lang)s is not supported", lang=target_lang)) try: filename = str(uuid.uuid4()) + '.' + secure_filename(file.filename) filepath = os.path.join(get_upload_dir(), filename) file.save(filepath) # Not an exact science: take the number of bytes and divide by # the character limit. Assuming a plain text file, this will # set the cost of the request to N = bytes / char_limit, which is # roughly equivalent to a batch process of N batches assuming # each batch uses all available limits if char_limit > 0: request.req_cost = max(1, int(os.path.getsize(filepath) / char_limit)) translated_file_path = argostranslatefiles.translate_file(src_lang.get_translation(tgt_lang), filepath) translated_filename = os.path.basename(translated_file_path) return jsonify( { "translatedFileUrl": url_for('Main app.download_file', filename=translated_filename, _external=True) } ) except Exception as e: abort(500, description=e) @bp.get("/download_file/") def download_file(filename: str): """ Download a translated file """ if args.disable_files_translation: abort(400, description=_("Files translation are disabled on this server.")) filepath = os.path.join(get_upload_dir(), filename) try: checked_filepath = security.path_traversal_check(filepath, get_upload_dir()) if os.path.isfile(checked_filepath): filepath = checked_filepath except security.SuspiciousFileOperationError: abort(400, description=_("Invalid filename")) return_data = io.BytesIO() with open(filepath, 'rb') as fo: return_data.write(fo.read()) return_data.seek(0) download_filename = filename.split('.') download_filename.pop(0) download_filename = '.'.join(download_filename) return send_file(return_data, as_attachment=True, download_name=download_filename) @bp.post("/detect") @access_check def detect(): """ Detect the language of a single text --- tags: - translate parameters: - in: formData name: q schema: type: string example: What language is this? required: true description: Text to detect - in: formData name: api_key schema: type: string example: xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx required: false description: API key responses: 200: description: Detections schema: id: detections type: array items: type: object properties: confidence: type: number format: integer minimum: 0 maximum: 100 description: Confidence value example: 100 language: type: string description: Language code example: en 400: description: Invalid request schema: id: error-response type: object properties: error: type: string description: Error message 500: description: Detection error schema: id: error-response type: object properties: error: type: string description: Error message 429: description: Slow down schema: id: error-slow-down type: object properties: error: type: string description: Reason for slow down 403: description: Banned schema: id: error-response type: object properties: error: type: string description: Error message """ if request.is_json: json = get_json_dict(request) q = json.get("q") else: q = request.values.get("q") if not q: abort(400, description=_("Invalid request: missing %(name)s parameter", name='q')) return jsonify(detect_languages(q)) @bp.route("/frontend/settings") @limiter.exempt def frontend_settings(): """ Retrieve frontend specific settings --- tags: - frontend responses: 200: description: frontend settings schema: id: frontend-settings type: object properties: charLimit: type: integer description: Character input limit for this language (-1 indicates no limit) frontendTimeout: type: integer description: Frontend translation timeout apiKeys: type: boolean description: Whether the API key database is enabled. keyRequired: type: boolean description: Whether an API key is required. suggestions: type: boolean description: Whether submitting suggestions is enabled. supportedFilesFormat: type: array items: type: string description: Supported files format language: type: object properties: source: type: object properties: code: type: string description: Language code name: type: string description: Human-readable language name (in English) target: type: object properties: code: type: string description: Language code name: type: string description: Human-readable language name (in English) """ target_lang = frontend_argos_language_target() return jsonify( { "charLimit": args.char_limit, "frontendTimeout": args.frontend_timeout, "apiKeys": args.api_keys, "keyRequired": bool(args.api_keys and args.require_api_key_origin), "suggestions": args.suggestions, "filesTranslation": not args.disable_files_translation, "supportedFilesFormat": [] if args.disable_files_translation else frontend_argos_supported_files_format, "language": { "source": { "code": frontend_argos_language_source.code, "name": _lazy(frontend_argos_language_source.name), }, "target": { "code": target_lang.code, "name": _lazy(target_lang.name), }, }, } ) @bp.post("/suggest") def suggest(): """ Submit a suggestion to improve a translation --- tags: - feedback parameters: - in: formData name: q schema: type: string example: Hello world! required: true description: Original text - in: formData name: s schema: type: string example: ¡Hola mundo! required: true description: Suggested translation - in: formData name: source schema: type: string example: en required: true description: Language of original text - in: formData name: target schema: type: string example: es required: true description: Language of suggested translation responses: 200: description: Success schema: id: suggest-response type: object properties: success: type: boolean description: Whether submission was successful 403: description: Not authorized schema: id: error-response type: object properties: error: type: string description: Error message """ if not args.suggestions: abort(403, description=_("Suggestions are disabled on this server.")) if request.is_json: json = get_json_dict(request) q = json.get("q") s = json.get("s") source_lang = json.get("source") target_lang = json.get("target") else: q = request.values.get("q") s = request.values.get("s") source_lang = request.values.get("source") target_lang = request.values.get("target") if not q: abort(400, description=_("Invalid request: missing %(name)s parameter", name='q')) if not s: abort(400, description=_("Invalid request: missing %(name)s parameter", name='s')) if not source_lang: abort(400, description=_("Invalid request: missing %(name)s parameter", name='source')) if not target_lang: abort(400, description=_("Invalid request: missing %(name)s parameter", name='target')) SuggestionsDatabase().add(q, s, source_lang, target_lang) return jsonify({"success": True}) app = Flask(__name__) app.config["SESSION_TYPE"] = "filesystem" app.config["SESSION_FILE_DIR"] = os.path.join("db", "sessions") app.config["JSON_AS_ASCII"] = False Session(app) if args.debug: app.config["TEMPLATES_AUTO_RELOAD"] = True if args.url_prefix: app.register_blueprint(bp, url_prefix=args.url_prefix) else: app.register_blueprint(bp) limiter.init_app(app) swag = swagger(app) swag["info"]["version"] = get_version() swag["info"]["title"] = "LibreTranslate" @app.route(api_url) @limiter.exempt def spec(): return jsonify(lazy_swag(swag)) app.config["BABEL_TRANSLATION_DIRECTORIES"] = 'locales' def get_locale(): override_lang = request.headers.get('X-Override-Accept-Language') if override_lang and override_lang in get_available_locale_codes(): return override_lang return session.get('preferred_lang', request.accept_languages.best_match(get_available_locale_codes())) Babel(app, locale_selector=get_locale) app.jinja_env.globals.update(_e=gettext_escaped, _h=gettext_html) # Call factory function to create our blueprint swaggerui_blueprint = get_swaggerui_blueprint(swagger_url, api_url) if args.url_prefix: app.register_blueprint(swaggerui_blueprint, url_prefix=swagger_url) else: app.register_blueprint(swaggerui_blueprint) return app