diff --git a/libretranslate/app.py b/libretranslate/app.py index 74d53c8..953593a 100644 --- a/libretranslate/app.py +++ b/libretranslate/app.py @@ -47,7 +47,8 @@ def get_version(): def get_upload_dir(): - upload_dir = os.path.join(tempfile.gettempdir(), "libretranslate-files-translate") + upload_dir = os.path.join(tempfile.gettempdir(), + "libretranslate-files-translate") if not os.path.isdir(upload_dir): os.mkdir(upload_dir) @@ -64,6 +65,7 @@ def get_req_api_key(): return ak + def get_req_secret(): if request.is_json: json = get_json_dict(request) @@ -130,8 +132,8 @@ def get_routes_limits(args, api_keys_db): def hourly_limits(n): def func(): - decay = (0.75 ** (n - 1)) - return "{} per {} hour".format(get_req_limits(args.hourly_req_limit * n, api_keys_db, int(os.environ.get("LT_HOURLY_REQ_LIMIT_MULTIPLIER", 60) * n), decay), n) + decay = (0.75 ** (n - 1)) + return "{} per {} hour".format(get_req_limits(args.hourly_req_limit * n, api_keys_db, int(os.environ.get("LT_HOURLY_REQ_LIMIT_MULTIPLIER", 60) * n), decay), n) return func def daily_limits(): @@ -140,8 +142,8 @@ def get_routes_limits(args, api_keys_db): res = [minute_limits] if args.hourly_req_limit > 0: - for n in range(1, args.hourly_req_limit_decay + 2): - res.append(hourly_limits(n)) + for n in range(1, args.hourly_req_limit_decay + 2): + res.append(hourly_limits(n)) if args.daily_req_limit > 0: res.append(daily_limits) @@ -149,6 +151,12 @@ def get_routes_limits(args, api_keys_db): return res +def clean_text(text): + pattern = re.compile(r'(?<=[^.!?])\n') + cleaned_text = re.sub(pattern, ' ', text) + return cleaned_text + + def create_app(args): from libretranslate.init import boot @@ -168,7 +176,8 @@ def create_app(args): languages = load_languages() language_pairs = {} for lang in languages: - language_pairs[lang.code] = sorted([l.to_lang.code for l in lang.translations_from]) + language_pairs[lang.code] = sorted( + [l.to_lang.code for l in lang.translations_from]) # Map userdefined frontend languages to argos language object. if args.frontend_language_source == "auto": @@ -177,33 +186,36 @@ def create_app(args): ) else: frontend_argos_language_source = next( - iter([l for l in languages if l.code == args.frontend_language_source]), + iter([l for l in languages if l.code == + args.frontend_language_source]), None, ) if frontend_argos_language_source is None: frontend_argos_language_source = languages[0] - - language_target_fallback = languages[1] if len(languages) >= 2 else languages[0] + language_target_fallback = languages[1] if len( + languages) >= 2 else languages[0] if args.frontend_language_target == "locale": - def resolve_language_locale(): - loc = get_locale() - language_target = next( - iter([l for l in languages if l.code == loc]), None - ) - if language_target is None: - language_target = language_target_fallback - return language_target + def resolve_language_locale(): + loc = get_locale() + language_target = next( + iter([l for l in languages if l.code == loc]), None + ) + if language_target is None: + language_target = language_target_fallback + return language_target - frontend_argos_language_target = resolve_language_locale + frontend_argos_language_target = resolve_language_locale else: - language_target = next( - iter([l for l in languages if l.code == args.frontend_language_target]), None - ) - if language_target is None: - language_target = language_target_fallback - frontend_argos_language_target = lambda: language_target + language_target = next( + iter([l for l in languages if l.code == + args.frontend_language_target]), None + ) + if language_target is None: + language_target = language_target_fallback + + def frontend_argos_language_target(): return language_target frontend_argos_supported_files_format = [] @@ -216,16 +228,17 @@ def create_app(args): if args.req_limit > 0 or args.api_keys or args.daily_req_limit > 0 or args.hourly_req_limit > 0: api_keys_db = None if args.api_keys: - api_keys_db = RemoteDatabase(args.api_keys_remote) if args.api_keys_remote else Database(args.api_keys_db_path) + api_keys_db = RemoteDatabase( + args.api_keys_remote) if args.api_keys_remote else Database(args.api_keys_db_path) from flask_limiter import Limiter def limits_cost(): - req_cost = getattr(request, 'req_cost', 1) - if args.req_time_cost > 0: - return max(req_cost, int(math.ceil(getattr(request, 'duration', 0) / args.req_time_cost))) - else: - return req_cost + req_cost = getattr(request, 'req_cost', 1) + if args.req_time_cost > 0: + return max(req_cost, int(math.ceil(getattr(request, 'duration', 0) / args.req_time_cost))) + else: + return req_cost limiter = Limiter( key_func=get_remote_address, @@ -233,7 +246,8 @@ def create_app(args): args, api_keys_db ), storage_uri=args.req_limit_storage, - default_limits_deduct_when=lambda req: True, # Force cost to be called after the request + # Force cost to be called after the request + default_limits_deduct_when=lambda req: True, default_limits_cost=limits_cost ) else: @@ -242,8 +256,8 @@ def create_app(args): limiter = Limiter() if not "gunicorn" in os.environ.get("SERVER_SOFTWARE", ""): - # Gunicorn starts the scheduler in the master process - scheduler.setup(args) + # Gunicorn starts the scheduler in the master process + scheduler.setup(args) flood.setup(args) secret.setup(args) @@ -251,31 +265,33 @@ def create_app(args): measure_request = None gauge_request = None if args.metrics: - if os.environ.get("PROMETHEUS_MULTIPROC_DIR") is None: - default_mp_dir = os.path.abspath(os.path.join("db", "prometheus")) - if not os.path.isdir(default_mp_dir): - os.mkdir(default_mp_dir) - os.environ["PROMETHEUS_MULTIPROC_DIR"] = default_mp_dir + if os.environ.get("PROMETHEUS_MULTIPROC_DIR") is None: + default_mp_dir = os.path.abspath(os.path.join("db", "prometheus")) + if not os.path.isdir(default_mp_dir): + os.mkdir(default_mp_dir) + os.environ["PROMETHEUS_MULTIPROC_DIR"] = default_mp_dir - from prometheus_client import CONTENT_TYPE_LATEST, CollectorRegistry, Gauge, Summary, generate_latest, multiprocess + from prometheus_client import CONTENT_TYPE_LATEST, CollectorRegistry, Gauge, Summary, generate_latest, multiprocess - @bp.route("/metrics") - @limiter.exempt - def prometheus_metrics(): - if args.metrics_auth_token: - authorization = request.headers.get('Authorization') - if authorization != "Bearer " + args.metrics_auth_token: - abort(401, description=_("Unauthorized")) + @bp.route("/metrics") + @limiter.exempt + def prometheus_metrics(): + if args.metrics_auth_token: + authorization = request.headers.get('Authorization') + if authorization != "Bearer " + args.metrics_auth_token: + abort(401, description=_("Unauthorized")) - registry = CollectorRegistry() - multiprocess.MultiProcessCollector(registry) - return Response(generate_latest(registry), mimetype=CONTENT_TYPE_LATEST) + registry = CollectorRegistry() + multiprocess.MultiProcessCollector(registry) + return Response(generate_latest(registry), mimetype=CONTENT_TYPE_LATEST) - measure_request = Summary('libretranslate_http_request_duration_seconds', 'Time spent on request', ['endpoint', 'status', 'request_ip', 'api_key']) - measure_request.labels('/translate', 200, '127.0.0.1', '') + measure_request = Summary('libretranslate_http_request_duration_seconds', 'Time spent on request', [ + 'endpoint', 'status', 'request_ip', 'api_key']) + measure_request.labels('/translate', 200, '127.0.0.1', '') - gauge_request = Gauge('libretranslate_http_requests_in_flight', 'Active requests', ['endpoint', 'request_ip', 'api_key'], multiprocess_mode='livesum') - gauge_request.labels('/translate', '127.0.0.1', '') + gauge_request = Gauge('libretranslate_http_requests_in_flight', 'Active requests', [ + 'endpoint', 'request_ip', 'api_key'], multiprocess_mode='livesum') + gauge_request.labels('/translate', '127.0.0.1', '') def access_check(f): @wraps(f) @@ -293,59 +309,62 @@ def create_app(args): description=_("Invalid API key"), ) else: - need_key = False - key_missing = api_keys_db.lookup(ak) is None + need_key = False + key_missing = api_keys_db.lookup(ak) is None - if (args.require_api_key_origin - and key_missing - and not re.match(args.require_api_key_origin, request.headers.get("Origin", "")) - ): - need_key = True + if (args.require_api_key_origin + and key_missing + and not re.match(args.require_api_key_origin, request.headers.get("Origin", "")) + ): + need_key = True - if (args.require_api_key_secret - and key_missing - and not secret.secret_match(get_req_secret()) - ): - need_key = True + if (args.require_api_key_secret + and key_missing + and not secret.secret_match(get_req_secret()) + ): + need_key = True - if need_key: - description = _("Please contact the server operator to get an API key") - if args.get_api_key_link: - description = _("Visit %(url)s to get an API key", url=args.get_api_key_link) - abort( - 400, - description=description, - ) + if need_key: + description = _( + "Please contact the server operator to get an API key") + if args.get_api_key_link: + description = _( + "Visit %(url)s to get an API key", url=args.get_api_key_link) + abort( + 400, + description=description, + ) return f(*a, **kw) if args.metrics: - @wraps(func) - def measure_func(*a, **kw): - start_t = default_timer() - status = 200 - ip = get_remote_address() - ak = get_req_api_key() or '' - g = gauge_request.labels(request.path, ip, ak) - try: - g.inc() - return func(*a, **kw) - except HTTPException as e: - status = e.code - raise e - finally: - request.duration = max(default_timer() - start_t, 0) - measure_request.labels(request.path, status, ip, ak).observe(request.duration) - g.dec() - return measure_func + @wraps(func) + def measure_func(*a, **kw): + start_t = default_timer() + status = 200 + ip = get_remote_address() + ak = get_req_api_key() or '' + g = gauge_request.labels(request.path, ip, ak) + try: + g.inc() + return func(*a, **kw) + except HTTPException as e: + status = e.code + raise e + finally: + request.duration = max(default_timer() - start_t, 0) + measure_request.labels( + request.path, status, ip, ak).observe(request.duration) + g.dec() + return measure_func else: - @wraps(func) - def time_func(*a, **kw): - start_t = default_timer() - try: - return func(*a, **kw) - finally: - request.duration = max(default_timer() - start_t, 0) - return time_func + @wraps(func) + def time_func(*a, **kw): + start_t = default_timer() + try: + return func(*a, **kw) + finally: + request.duration = max(default_timer() - start_t, 0) + return time_func @bp.errorhandler(400) def invalid_api(e): @@ -383,7 +402,8 @@ def create_app(args): web_version=os.environ.get("LT_WEB") is not None, version=get_version(), swagger_url=swagger_url, - available_locales=[{'code': l['code'], 'name': _lazy(l['name'])} for l in get_available_locales(not args.debug)], + available_locales=[{'code': l['code'], 'name': _lazy( + l['name'])} for l in get_available_locales(not args.debug)], current_locale=get_locale(), alternate_locales=get_alternate_locale_links() ) @@ -391,21 +411,21 @@ def create_app(args): @bp.route("/js/app.js") @limiter.exempt def appjs(): - if args.disable_web_ui: + if args.disable_web_ui: abort(404) - response = Response(render_template("app.js.template", - url_prefix=args.url_prefix, - get_api_key_link=args.get_api_key_link, - api_secret=secret.get_current_secret() if args.require_api_key_secret else ""), content_type='application/javascript; charset=utf-8') + response = Response(render_template("app.js.template", + url_prefix=args.url_prefix, + get_api_key_link=args.get_api_key_link, + api_secret=secret.get_current_secret() if args.require_api_key_secret else ""), content_type='application/javascript; charset=utf-8') - if args.require_api_key_secret: - response.headers['Last-Modified'] = http_date(datetime.now()) - response.headers['Cache-Control'] = 'no-store, no-cache, must-revalidate, post-check=0, pre-check=0, max-age=0' - response.headers['Pragma'] = 'no-cache' - response.headers['Expires'] = '-1' + if args.require_api_key_secret: + response.headers['Last-Modified'] = http_date(datetime.now()) + response.headers['Cache-Control'] = 'no-store, no-cache, must-revalidate, post-check=0, pre-check=0, max-age=0' + response.headers['Pragma'] = 'no-cache' + response.headers['Expires'] = '-1' - return response + return response @bp.get("/languages") @limiter.exempt @@ -565,17 +585,17 @@ def create_app(args): text_format = request.values.get("format") if not q: - abort(400, description=_("Invalid request: missing %(name)s parameter", name='q')) + abort(400, description=_( + "Invalid request: missing %(name)s parameter", name='q')) if not source_lang: - abort(400, description=_("Invalid request: missing %(name)s parameter", name='source')) + abort(400, description=_( + "Invalid request: missing %(name)s parameter", name='source')) if not target_lang: - abort(400, description=_("Invalid request: missing %(name)s parameter", name='target')) + abort(400, description=_( + "Invalid request: missing %(name)s parameter", name='target')) if not request.is_json: - # Normalize line endings to UNIX style (LF) only so we can consistently - # enforce character limits. - # https://www.rfc-editor.org/rfc/rfc2046#section-4.1.1 - q = "\n".join(q.splitlines()) + q = clean_text(q) char_limit = get_char_limit(args.char_limit, api_keys_db) @@ -586,7 +606,8 @@ def create_app(args): if args.batch_limit < batch_size: abort( 400, - description=_("Invalid request: request (%(size)s) exceeds text limit (%(limit)s)", size=batch_size, limit=args.batch_limit), + description=_("Invalid request: request (%(size)s) exceeds text limit (%(limit)s)", + size=batch_size, limit=args.batch_limit), ) src_texts = q if batch else [q] @@ -596,7 +617,8 @@ def create_app(args): if len(text) > char_limit: abort( 400, - description=_("Invalid request: request (%(size)s) exceeds text limit (%(limit)s)", size=len(text), limit=char_limit), + description=_("Invalid request: request (%(size)s) exceeds text limit (%(limit)s)", size=len( + text), limit=char_limit), ) if batch: @@ -608,21 +630,24 @@ def create_app(args): else: detected_src_lang = {"confidence": 100.0, "language": source_lang} - src_lang = next(iter([l for l in languages if l.code == detected_src_lang["language"]]), None) + src_lang = next( + iter([l for l in languages if l.code == detected_src_lang["language"]]), None) if src_lang is None: abort(400, description=_("%(lang)s is not supported", lang=source_lang)) - tgt_lang = next(iter([l for l in languages if l.code == target_lang]), None) + tgt_lang = next( + iter([l for l in languages if l.code == target_lang]), None) if tgt_lang is None: - abort(400, description=_("%(lang)s is not supported",lang=target_lang)) + abort(400, description=_("%(lang)s is not supported", lang=target_lang)) if not text_format: text_format = "text" if text_format not in ["text", "html"]: - abort(400, description=_("%(format)s format is not supported", format=text_format)) + abort(400, description=_( + "%(format)s format is not supported", format=text_format)) try: if batch: @@ -630,12 +655,14 @@ def create_app(args): for text in q: translator = src_lang.get_translation(tgt_lang) if translator is None: - abort(400, description=_("%(tname)s (%(tcode)s) is not available as a target language from %(sname)s (%(scode)s)", tname=_lazy(tgt_lang.name), tcode=tgt_lang.code, sname=_lazy(src_lang.name), scode=src_lang.code)) + abort(400, description=_("%(tname)s (%(tcode)s) is not available as a target language from %(sname)s (%(scode)s)", tname=_lazy( + tgt_lang.name), tcode=tgt_lang.code, sname=_lazy(src_lang.name), scode=src_lang.code)) if text_format == "html": translated_text = str(translate_html(translator, text)) else: - translated_text = improve_translation_formatting(text, translator.translate(text)) + translated_text = improve_translation_formatting( + text, translator.translate(text)) results.append(unescape(translated_text)) if source_lang == "auto": @@ -647,19 +674,21 @@ def create_app(args): ) else: return jsonify( - { + { "translatedText": results - } + } ) else: translator = src_lang.get_translation(tgt_lang) if translator is None: - abort(400, description=_("%(tname)s (%(tcode)s) is not available as a target language from %(sname)s (%(scode)s)", tname=_lazy(tgt_lang.name), tcode=tgt_lang.code, sname=_lazy(src_lang.name), scode=src_lang.code)) + abort(400, description=_("%(tname)s (%(tcode)s) is not available as a target language from %(sname)s (%(scode)s)", tname=_lazy( + tgt_lang.name), tcode=tgt_lang.code, sname=_lazy(src_lang.name), scode=src_lang.code)) if text_format == "html": translated_text = str(translate_html(translator, q)) else: - translated_text = improve_translation_formatting(q, translator.translate(q)) + translated_text = improve_translation_formatting( + q, translator.translate(q)) if source_lang == "auto": return jsonify( @@ -676,7 +705,8 @@ def create_app(args): ) except Exception as e: raise e - abort(500, description=_("Cannot translate text: %(text)s", text=str(e))) + abort(500, description=_( + "Cannot translate text: %(text)s", text=str(e))) @bp.post("/translate_file") @access_check @@ -763,7 +793,8 @@ def create_app(args): description: Error message """ if args.disable_files_translation: - abort(403, description=_("Files translation are disabled on this server.")) + abort(403, description=_( + "Files translation are disabled on this server.")) source_lang = request.form.get("source") target_lang = request.form.get("target") @@ -771,11 +802,14 @@ def create_app(args): char_limit = get_char_limit(args.char_limit, api_keys_db) if not file: - abort(400, description=_("Invalid request: missing %(name)s parameter", name='file')) + abort(400, description=_( + "Invalid request: missing %(name)s parameter", name='file')) if not source_lang: - abort(400, description=_("Invalid request: missing %(name)s parameter", name='source')) + abort(400, description=_( + "Invalid request: missing %(name)s parameter", name='source')) if not target_lang: - abort(400, description=_("Invalid request: missing %(name)s parameter", name='target')) + abort(400, description=_( + "Invalid request: missing %(name)s parameter", name='target')) if file.filename == '': abort(400, description=_("Invalid request: empty file")) @@ -783,12 +817,14 @@ def create_app(args): if os.path.splitext(file.filename)[1] not in frontend_argos_supported_files_format: abort(400, description=_("Invalid request: file format not supported")) - src_lang = next(iter([l for l in languages if l.code == source_lang]), None) + src_lang = next( + iter([l for l in languages if l.code == source_lang]), None) if src_lang is None: abort(400, description=_("%(lang)s is not supported", lang=source_lang)) - tgt_lang = next(iter([l for l in languages if l.code == target_lang]), None) + tgt_lang = next( + iter([l for l in languages if l.code == target_lang]), None) if tgt_lang is None: abort(400, description=_("%(lang)s is not supported", lang=target_lang)) @@ -805,9 +841,11 @@ def create_app(args): # roughly equivalent to a batch process of N batches assuming # each batch uses all available limits if char_limit > 0: - request.req_cost = max(1, int(os.path.getsize(filepath) / char_limit)) + request.req_cost = max( + 1, int(os.path.getsize(filepath) / char_limit)) - translated_file_path = argostranslatefiles.translate_file(src_lang.get_translation(tgt_lang), filepath) + translated_file_path = argostranslatefiles.translate_file( + src_lang.get_translation(tgt_lang), filepath) translated_filename = os.path.basename(translated_file_path) return jsonify( @@ -824,11 +862,13 @@ def create_app(args): Download a translated file """ if args.disable_files_translation: - abort(400, description=_("Files translation are disabled on this server.")) + abort(400, description=_( + "Files translation are disabled on this server.")) filepath = os.path.join(get_upload_dir(), filename) try: - checked_filepath = security.path_traversal_check(filepath, get_upload_dir()) + checked_filepath = security.path_traversal_check( + filepath, get_upload_dir()) if os.path.isfile(checked_filepath): filepath = checked_filepath except security.SuspiciousFileOperationError: @@ -932,7 +972,8 @@ def create_app(args): q = request.values.get("q") if not q: - abort(400, description=_("Invalid request: missing %(name)s parameter", name='q')) + abort(400, description=_( + "Invalid request: missing %(name)s parameter", name='q')) return jsonify(detect_languages(q)) @@ -1089,13 +1130,17 @@ def create_app(args): target_lang = request.values.get("target") if not q: - abort(400, description=_("Invalid request: missing %(name)s parameter", name='q')) + abort(400, description=_( + "Invalid request: missing %(name)s parameter", name='q')) if not s: - abort(400, description=_("Invalid request: missing %(name)s parameter", name='s')) + abort(400, description=_( + "Invalid request: missing %(name)s parameter", name='s')) if not source_lang: - abort(400, description=_("Invalid request: missing %(name)s parameter", name='source')) + abort(400, description=_( + "Invalid request: missing %(name)s parameter", name='source')) if not target_lang: - abort(400, description=_("Invalid request: missing %(name)s parameter", name='target')) + abort(400, description=_( + "Invalid request: missing %(name)s parameter", name='target')) SuggestionsDatabase().add(q, s, source_lang, target_lang) return jsonify({"success": True})