Track token rate limits per endpoint

This commit is contained in:
Zed 2022-01-05 22:48:45 +01:00
parent d726894555
commit dd71e60f35
5 changed files with 79 additions and 58 deletions

View file

@ -6,57 +6,57 @@ import types, query, formatters, consts, apiutils, parser
proc getGraphListBySlug*(name, list: string): Future[List] {.async.} = proc getGraphListBySlug*(name, list: string): Future[List] {.async.} =
let let
variables = %*{"screenName": name, "listSlug": list, "withHighlightedLabel": false} variables = %*{"screenName": name, "listSlug": list, "withHighlightedLabel": false}
js = await fetch(graphList ? {"variables": $variables}) url = graphListBySlug ? {"variables": $variables}
result = parseGraphList(js) result = parseGraphList(await fetch(url, Api.listBySlug))
proc getGraphList*(id: string): Future[List] {.async.} = proc getGraphList*(id: string): Future[List] {.async.} =
let let
variables = %*{"listId": id, "withHighlightedLabel": false} variables = %*{"listId": id, "withHighlightedLabel": false}
js = await fetch(graphListId ? {"variables": $variables}) url = graphList ? {"variables": $variables}
result = parseGraphList(js) result = parseGraphList(await fetch(url, Api.list))
proc getListTimeline*(id: string; after=""): Future[Timeline] {.async.} = proc getListTimeline*(id: string; after=""): Future[Timeline] {.async.} =
if id.len == 0: return if id.len == 0: return
let let
ps = genParams({"list_id": id, "ranking_mode": "reverse_chronological"}, after) ps = genParams({"list_id": id, "ranking_mode": "reverse_chronological"}, after)
url = listTimeline ? ps url = listTimeline ? ps
result = parseTimeline(await fetch(url), after) result = parseTimeline(await fetch(url, Api.timeline), after)
proc getListMembers*(list: List; after=""): Future[Result[Profile]] {.async.} = proc getListMembers*(list: List; after=""): Future[Result[Profile]] {.async.} =
if list.id.len == 0: return if list.id.len == 0: return
let let
ps = genParams({"list_id": list.id}, after) ps = genParams({"list_id": list.id}, after)
url = listMembers ? ps url = listMembers ? ps
result = parseListMembers(await fetch(url, oldApi=true), after) result = parseListMembers(await fetch(url, Api.listMembers), after)
proc getProfile*(username: string): Future[Profile] {.async.} = proc getProfile*(username: string): Future[Profile] {.async.} =
let let
ps = genParams({"screen_name": username}) ps = genParams({"screen_name": username})
js = await fetch(userShow ? ps, oldApi=true) js = await fetch(userShow ? ps, Api.userShow)
result = parseUserShow(js, username=username) result = parseUserShow(js, username=username)
proc getProfileById*(userId: string): Future[Profile] {.async.} = proc getProfileById*(userId: string): Future[Profile] {.async.} =
let let
ps = genParams({"user_id": userId}) ps = genParams({"user_id": userId})
js = await fetch(userShow ? ps, oldApi=true) js = await fetch(userShow ? ps, Api.userShow)
result = parseUserShow(js, id=userId) result = parseUserShow(js, id=userId)
proc getTimeline*(id: string; after=""; replies=false): Future[Timeline] {.async.} = proc getTimeline*(id: string; after=""; replies=false): Future[Timeline] {.async.} =
let let
ps = genParams({"userId": id, "include_tweet_replies": $replies}, after) ps = genParams({"userId": id, "include_tweet_replies": $replies}, after)
url = timeline / (id & ".json") ? ps url = timeline / (id & ".json") ? ps
result = parseTimeline(await fetch(url), after) result = parseTimeline(await fetch(url, Api.timeline), after)
proc getMediaTimeline*(id: string; after=""): Future[Timeline] {.async.} = proc getMediaTimeline*(id: string; after=""): Future[Timeline] {.async.} =
let url = mediaTimeline / (id & ".json") ? genParams(cursor=after) let url = mediaTimeline / (id & ".json") ? genParams(cursor=after)
result = parseTimeline(await fetch(url), after) result = parseTimeline(await fetch(url, Api.timeline), after)
proc getPhotoRail*(name: string): Future[PhotoRail] {.async.} = proc getPhotoRail*(name: string): Future[PhotoRail] {.async.} =
let let
ps = genParams({"screen_name": name, "trim_user": "true"}, ps = genParams({"screen_name": name, "trim_user": "true"},
count="18", ext=false) count="18", ext=false)
url = photoRail ? ps url = photoRail ? ps
result = parsePhotoRail(await fetch(url, oldApi=true)) result = parsePhotoRail(await fetch(url, Api.photoRail))
proc getSearch*[T](query: Query; after=""): Future[Result[T]] {.async.} = proc getSearch*[T](query: Query; after=""): Future[Result[T]] {.async.} =
when T is Profile: when T is Profile:
@ -74,14 +74,14 @@ proc getSearch*[T](query: Query; after=""): Future[Result[T]] {.async.} =
let url = search ? genParams(searchParams & @[("q", q), searchMode], after) let url = search ? genParams(searchParams & @[("q", q), searchMode], after)
try: try:
result = parse(await fetch(url), after) result = parse(await fetch(url, Api.search), after)
result.query = query result.query = query
except InternalError: except InternalError:
return Result[T](beginning: true, query: query) return Result[T](beginning: true, query: query)
proc getTweetImpl(id: string; after=""): Future[Conversation] {.async.} = proc getTweetImpl(id: string; after=""): Future[Conversation] {.async.} =
let url = tweet / (id & ".json") ? genParams(cursor=after) let url = tweet / (id & ".json") ? genParams(cursor=after)
result = parseConversation(await fetch(url), id) result = parseConversation(await fetch(url, Api.tweet), id)
proc getReplies*(id, after: string): Future[Result[Chain]] {.async.} = proc getReplies*(id, after: string): Future[Result[Chain]] {.async.} =
result = (await getTweetImpl(id, after)).replies result = (await getTweetImpl(id, after)).replies

View file

@ -3,7 +3,9 @@ import httpclient, asyncdispatch, options, times, strutils, uri
import packedjson, zippy import packedjson, zippy
import types, tokens, consts, parserutils, http_pool import types, tokens, consts, parserutils, http_pool
const rl = "x-rate-limit-" const
rlRemaining = "x-rate-limit-remaining"
rlReset = "x-rate-limit-reset"
var pool: HttpPool var pool: HttpPool
@ -38,11 +40,11 @@ proc genHeaders*(token: Token = nil): HttpHeaders =
"DNT": "1" "DNT": "1"
}) })
proc fetch*(url: Uri; oldApi=false): Future[JsonNode] {.async.} = proc fetch*(url: Uri; api: Api): Future[JsonNode] {.async.} =
once: once:
pool = HttpPool() pool = HttpPool()
var token = await getToken() var token = await getToken(api)
if token.tok.len == 0: if token.tok.len == 0:
raise rateLimitError() raise rateLimitError()
@ -65,9 +67,14 @@ proc fetch*(url: Uri; oldApi=false): Future[JsonNode] {.async.} =
echo resp.status, ": ", body echo resp.status, ": ", body
result = newJNull() result = newJNull()
if not oldApi and resp.headers.hasKey(rl & "reset"): if api != Api.search and resp.headers.hasKey(rlRemaining):
token.remaining = parseInt(resp.headers[rl & "remaining"]) let
token.reset = fromUnix(parseInt(resp.headers[rl & "reset"])) remaining = parseInt(resp.headers[rlRemaining])
reset = parseInt(resp.headers[rlReset])
token.setRateLimit(api, remaining, reset)
echo api, " ", remaining, " ", url.path
else:
echo api, " ", url.path
if result.getError notin {invalidToken, forbidden, badToken}: if result.getError notin {invalidToken, forbidden, badToken}:
token.lastUse = getTime() token.lastUse = getTime()

View file

@ -19,8 +19,8 @@ const
tweet* = timelineApi / "conversation" tweet* = timelineApi / "conversation"
graphql = api / "graphql" graphql = api / "graphql"
graphList* = graphql / "ErWsz9cObLel1BF-HjuBlA/ListBySlug" graphListBySlug* = graphql / "ErWsz9cObLel1BF-HjuBlA/ListBySlug"
graphListId* = graphql / "JADTh6cjebfgetzvF3tQvQ/List" graphList* = graphql / "JADTh6cjebfgetzvF3tQvQ/List"
timelineParams* = { timelineParams* = {
"include_profile_interstitial_type": "0", "include_profile_interstitial_type": "0",

View file

@ -1,13 +1,12 @@
# SPDX-License-Identifier: AGPL-3.0-only # SPDX-License-Identifier: AGPL-3.0-only
import asyncdispatch, httpclient, times, sequtils, json, math, random import asyncdispatch, httpclient, times, sequtils, json, random
import strutils, strformat import strutils, tables
import zippy import zippy
import types, agents, consts, http_pool import types, agents, consts, http_pool
const const
expirationTime = 3.hours maxAge = 3.hours # tokens expire after 3 hours
maxLastUse = 1.hours maxLastUse = 1.hours # if a token is unused for 60 minutes, it expires
resetPeriod = 15.minutes
failDelay = initDuration(minutes=30) failDelay = initDuration(minutes=30)
var var
@ -15,14 +14,9 @@ var
tokenPool: seq[Token] tokenPool: seq[Token]
lastFailed: Time lastFailed: Time
proc getPoolInfo*: string =
if tokenPool.len == 0: return "token pool empty"
let avg = tokenPool.mapIt(it.remaining).sum() div tokenPool.len
return &"{tokenPool.len} tokens, average remaining: {avg}"
proc rateLimitError*(): ref RateLimitError = proc rateLimitError*(): ref RateLimitError =
newException(RateLimitError, "rate limited with " & getPoolInfo()) newException(RateLimitError, "rate limited")
proc fetchToken(): Future[Token] {.async.} = proc fetchToken(): Future[Token] {.async.} =
if getTime() - lastFailed < failDelay: if getTime() - lastFailed < failDelay:
@ -37,51 +31,58 @@ proc fetchToken(): Future[Token] {.async.} =
"authorization": auth "authorization": auth
}) })
var
resp: string
tokNode: JsonNode
tok: string
try: try:
resp = clientPool.use(headers): await c.postContent(activate) let
tokNode = parseJson(uncompress(resp))["guest_token"] resp = clientPool.use(headers): await c.postContent(activate)
tok = tokNode.getStr($(tokNode.getInt)) tokNode = parseJson(uncompress(resp))["guest_token"]
tok = tokNode.getStr($(tokNode.getInt))
time = getTime()
let time = getTime() return Token(tok: tok, init: time, lastUse: time)
result = Token(tok: tok, remaining: 187, reset: time + resetPeriod,
init: time, lastUse: time)
except Exception as e: except Exception as e:
lastFailed = getTime() lastFailed = getTime()
echo "fetching token failed: ", e.msg echo "fetching token failed: ", e.msg
template expired(token: Token): untyped = proc expired(token: Token): bool =
let time = getTime() let time = getTime()
token.init < time - expirationTime or token.init < time - maxAge or token.lastUse < time - maxLastUse
token.lastUse < time - maxLastUse
template isLimited(token: Token): untyped = proc isLimited(token: Token; api: Api): bool =
token == nil or (token.remaining <= 5 and token.reset > getTime()) or if token.isNil or token.expired:
token.expired return true
if api in token.apis:
let limit = token.apis[api]
return (limit.remaining <= 5 and limit.reset > getTime())
else:
return false
proc release*(token: Token; invalid=false) = proc release*(token: Token; invalid=false) =
if token != nil and (invalid or token.expired): if not token.isNil and (invalid or token.expired):
let idx = tokenPool.find(token) let idx = tokenPool.find(token)
if idx > -1: tokenPool.delete(idx) if idx > -1: tokenPool.delete(idx)
proc getToken*(): Future[Token] {.async.} = proc getToken*(api: Api): Future[Token] {.async.} =
for i in 0 ..< tokenPool.len: for i in 0 ..< tokenPool.len:
if not result.isLimited: break if not (result.isNil or result.isLimited(api)):
break
release(result) release(result)
result = tokenPool.sample() result = tokenPool.sample()
if result.isLimited: if result.isNil or result.isLimited(api):
release(result) release(result)
result = await fetchToken() result = await fetchToken()
tokenPool.add result tokenPool.add result
if result == nil: if result.isNil:
raise rateLimitError() raise rateLimitError()
proc setRateLimit*(token: Token; api: Api; remaining, reset: int) =
token.apis[api] = RateLimit(
remaining: remaining,
reset: fromUnix(reset)
)
proc poolTokens*(amount: int) {.async.} = proc poolTokens*(amount: int) {.async.} =
var futs: seq[Future[Token]] var futs: seq[Future[Token]]
for i in 0 ..< amount: for i in 0 ..< amount:
@ -93,13 +94,13 @@ proc poolTokens*(amount: int) {.async.} =
try: newToken = await token try: newToken = await token
except: discard except: discard
if newToken != nil: if not newToken.isNil:
tokenPool.add newToken tokenPool.add newToken
proc initTokenPool*(cfg: Config) {.async.} = proc initTokenPool*(cfg: Config) {.async.} =
clientPool = HttpPool() clientPool = HttpPool()
while true: while true:
if tokenPool.countIt(not it.isLimited) < cfg.minTokens: if tokenPool.countIt(not it.isLimited(Api.timeline)) < cfg.minTokens:
await poolTokens(min(4, cfg.minTokens - tokenPool.len)) await poolTokens(min(4, cfg.minTokens - tokenPool.len))
await sleepAsync(2000) await sleepAsync(2000)

View file

@ -8,12 +8,25 @@ type
RateLimitError* = object of CatchableError RateLimitError* = object of CatchableError
InternalError* = object of CatchableError InternalError* = object of CatchableError
Token* = ref object Api* {.pure.} = enum
tok*: string userShow
photoRail
timeline
search
tweet
list
listBySlug
listMembers
RateLimit* = object
remaining*: int remaining*: int
reset*: Time reset*: Time
Token* = ref object
tok*: string
init*: Time init*: Time
lastUse*: Time lastUse*: Time
apis*: Table[Api, RateLimit]
Error* = enum Error* = enum
null = 0 null = 0