Gather and wait on async requests

This sends out the request tasks all at once and then aggregates the
results, instead of just running them one after another asynchronously.
This commit is contained in:
Mouse Reeve 2022-05-30 11:58:39 -07:00
parent 5e81ec75fb
commit 45f2199c71

View file

@ -12,7 +12,7 @@ from django.db.models import signals
from requests import HTTPError from requests import HTTPError
from bookwyrm import book_search, models from bookwyrm import book_search, models
from bookwyrm.settings import SEARCH_TIMEOUT, USER_AGENT from bookwyrm.settings import QUERY_TIMEOUT, USER_AGENT
from bookwyrm.tasks import app from bookwyrm.tasks import app
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -22,9 +22,8 @@ class ConnectorException(HTTPError):
"""when the connector can't do what was asked""" """when the connector can't do what was asked"""
async def async_connector_search(query, items, params): async def get_results(session, url, params, query, connector):
"""Try a number of requests simultaneously""" """try this specific connector"""
timeout = aiohttp.ClientTimeout(total=SEARCH_TIMEOUT)
# pylint: disable=line-too-long # pylint: disable=line-too-long
headers = { headers = {
"Accept": ( "Accept": (
@ -32,21 +31,36 @@ async def async_connector_search(query, items, params):
), ),
"User-Agent": USER_AGENT, "User-Agent": USER_AGENT,
} }
try:
async with session.get(url, headers=headers, params=params) as response:
try:
raw_data = await response.json()
except aiohttp.client_exceptions.ContentTypeError as err:
logger.exception(err)
return None
return {
"connector": connector,
"results": connector.process_search_response(query, raw_data),
}
except asyncio.TimeoutError:
logger.exception("Connection timout for url: %s", url)
async def async_connector_search(query, items, params):
"""Try a number of requests simultaneously"""
timeout = aiohttp.ClientTimeout(total=QUERY_TIMEOUT)
async with aiohttp.ClientSession(timeout=timeout) as session: async with aiohttp.ClientSession(timeout=timeout) as session:
tasks = []
for url, connector in items: for url, connector in items:
url = connector.get_search_url(query) tasks.append(
# raise_not_valid_url(url) asyncio.ensure_future(
get_results(session, url, params, query, connector)
)
)
async with session.get(url, headers=headers, params=params) as response: results = await asyncio.gather(*tasks)
print("Status:", response.status) return results
print(response.ok)
print("Content-type:", response.headers["content-type"])
raw_response = await response.json()
return {
"connector": connector,
"results": connector.process_search_response(query, raw_response),
}
def search(query, min_confidence=0.1, return_first=False): def search(query, min_confidence=0.1, return_first=False):
@ -66,13 +80,13 @@ def search(query, min_confidence=0.1, return_first=False):
# load as many results as we can # load as many results as we can
params = {"min_confidence": min_confidence} params = {"min_confidence": min_confidence}
results = asyncio.run(async_connector_search(query, items, params)) results = asyncio.run(async_connector_search(query, items, params))
raise Exception("Hi")
if return_first: if return_first:
# find the best result from all the responses and return that # find the best result from all the responses and return that
raise Exception("Not implemented yet") raise Exception("Not implemented yet")
return results # failed requests will return None, so filter those out
return [r for r in results if r]
def first_search_result(query, min_confidence=0.1): def first_search_result(query, min_confidence=0.1):