From 34964f9e569616c395dab855c3409c9b1902d31a Mon Sep 17 00:00:00 2001 From: Zed Date: Wed, 5 Jan 2022 23:38:46 +0100 Subject: [PATCH] Track pending token requests to limit concurrency --- src/apiutils.nim | 9 +++------ src/tokens.nim | 41 ++++++++++++++++++++++++++++------------- src/types.nim | 1 + 3 files changed, 32 insertions(+), 19 deletions(-) diff --git a/src/apiutils.nim b/src/apiutils.nim index ada68a9..25d9456 100644 --- a/src/apiutils.nim +++ b/src/apiutils.nim @@ -72,15 +72,12 @@ proc fetch*(url: Uri; api: Api): Future[JsonNode] {.async.} = remaining = parseInt(resp.headers[rlRemaining]) reset = parseInt(resp.headers[rlReset]) token.setRateLimit(api, remaining, reset) - echo api, " ", remaining, " ", url.path - else: - echo api, " ", url.path if result.getError notin {invalidToken, forbidden, badToken}: - token.lastUse = getTime() + release(token, used=true) else: echo "fetch error: ", result.getError - release(token, true) + release(token, invalid=true) raise rateLimitError() if resp.status == $Http400: @@ -90,5 +87,5 @@ proc fetch*(url: Uri; api: Api): Future[JsonNode] {.async.} = except Exception as e: echo "error: ", e.name, ", msg: ", e.msg, ", token: ", token[], ", url: ", url if "length" notin e.msg and "descriptor" notin e.msg: - release(token, true) + release(token, invalid=true) raise rateLimitError() diff --git a/src/tokens.nim b/src/tokens.nim index c18b032..96f7dbd 100644 --- a/src/tokens.nim +++ b/src/tokens.nim @@ -5,8 +5,9 @@ import zippy import types, agents, consts, http_pool const - maxAge = 3.hours # tokens expire after 3 hours - maxLastUse = 1.hours # if a token is unused for 60 minutes, it expires + maxConcurrentReqs = 5 # max requests at a time per token, to avoid race conditions + maxAge = 3.hours # tokens expire after 3 hours + maxLastUse = 1.hours # if a token is unused for 60 minutes, it expires failDelay = initDuration(minutes=30) var @@ -19,6 +20,7 @@ proc getPoolJson*: string = for token in tokenPool: list[token.tok] = %*{ "apis": newJObject(), + "pending": token.pending, "init": $token.init, "lastUse": $token.lastUse } @@ -69,35 +71,48 @@ proc isLimited(token: Token; api: Api): bool = if api in token.apis: let limit = token.apis[api] - return (limit.remaining <= 5 and limit.reset > getTime()) + return (limit.remaining <= 10 and limit.reset > getTime()) else: return false -proc release*(token: Token; invalid=false) = - if not token.isNil and (invalid or token.expired): +proc isReady(token: Token; api: Api): bool = + not (token.isNil or token.pending > maxConcurrentReqs or token.isLimited(api)) + +proc release*(token: Token; used=false; invalid=false) = + if token.isNil: return + if invalid or token.expired: let idx = tokenPool.find(token) if idx > -1: tokenPool.delete(idx) + elif used: + dec token.pending + token.lastUse = getTime() proc getToken*(api: Api): Future[Token] {.async.} = for i in 0 ..< tokenPool.len: - if not (result.isNil or result.isLimited(api)): - break + if result.isReady(api): break release(result) result = tokenPool.sample() - if result.isNil or result.isLimited(api): + if not result.isReady(api): release(result) result = await fetchToken() tokenPool.add result - if result.isNil: + if not result.isNil: + inc result.pending + else: raise rateLimitError() proc setRateLimit*(token: Token; api: Api; remaining, reset: int) = - token.apis[api] = RateLimit( - remaining: remaining, - reset: fromUnix(reset) - ) + let reset = fromUnix(reset) + + # avoid undefined behavior in race conditions + if api in token.apis: + let limit = token.apis[api] + if limit.reset >= reset and limit.remaining < remaining: + return + + token.apis[api] = RateLimit(remaining: remaining, reset: reset) proc poolTokens*(amount: int) {.async.} = var futs: seq[Future[Token]] diff --git a/src/types.nim b/src/types.nim index 53b171c..d14bf6b 100644 --- a/src/types.nim +++ b/src/types.nim @@ -26,6 +26,7 @@ type tok*: string init*: Time lastUse*: Time + pending*: int apis*: Table[Api, RateLimit] Error* = enum