From 45f2199c7120e20d4c098676b0d0b926eadd7e40 Mon Sep 17 00:00:00 2001 From: Mouse Reeve Date: Mon, 30 May 2022 11:58:39 -0700 Subject: [PATCH] Gather and wait on async requests This sends out the request tasks all at once and then aggregates the results, instead of just running them one after another asynchronously. --- bookwyrm/connectors/connector_manager.py | 50 +++++++++++++++--------- 1 file changed, 32 insertions(+), 18 deletions(-) diff --git a/bookwyrm/connectors/connector_manager.py b/bookwyrm/connectors/connector_manager.py index a94cdbc5..23cde632 100644 --- a/bookwyrm/connectors/connector_manager.py +++ b/bookwyrm/connectors/connector_manager.py @@ -12,7 +12,7 @@ from django.db.models import signals from requests import HTTPError from bookwyrm import book_search, models -from bookwyrm.settings import SEARCH_TIMEOUT, USER_AGENT +from bookwyrm.settings import QUERY_TIMEOUT, USER_AGENT from bookwyrm.tasks import app logger = logging.getLogger(__name__) @@ -22,9 +22,8 @@ class ConnectorException(HTTPError): """when the connector can't do what was asked""" -async def async_connector_search(query, items, params): - """Try a number of requests simultaneously""" - timeout = aiohttp.ClientTimeout(total=SEARCH_TIMEOUT) +async def get_results(session, url, params, query, connector): + """try this specific connector""" # pylint: disable=line-too-long headers = { "Accept": ( @@ -32,21 +31,36 @@ async def async_connector_search(query, items, params): ), "User-Agent": USER_AGENT, } + try: + async with session.get(url, headers=headers, params=params) as response: + try: + raw_data = await response.json() + except aiohttp.client_exceptions.ContentTypeError as err: + logger.exception(err) + return None + + return { + "connector": connector, + "results": connector.process_search_response(query, raw_data), + } + except asyncio.TimeoutError: + logger.exception("Connection timout for url: %s", url) + + +async def async_connector_search(query, items, params): + """Try a number of requests simultaneously""" + timeout = aiohttp.ClientTimeout(total=QUERY_TIMEOUT) async with aiohttp.ClientSession(timeout=timeout) as session: + tasks = [] for url, connector in items: - url = connector.get_search_url(query) - # raise_not_valid_url(url) + tasks.append( + asyncio.ensure_future( + get_results(session, url, params, query, connector) + ) + ) - async with session.get(url, headers=headers, params=params) as response: - print("Status:", response.status) - print(response.ok) - print("Content-type:", response.headers["content-type"]) - - raw_response = await response.json() - return { - "connector": connector, - "results": connector.process_search_response(query, raw_response), - } + results = await asyncio.gather(*tasks) + return results def search(query, min_confidence=0.1, return_first=False): @@ -66,13 +80,13 @@ def search(query, min_confidence=0.1, return_first=False): # load as many results as we can params = {"min_confidence": min_confidence} results = asyncio.run(async_connector_search(query, items, params)) - raise Exception("Hi") if return_first: # find the best result from all the responses and return that raise Exception("Not implemented yet") - return results + # failed requests will return None, so filter those out + return [r for r in results if r] def first_search_result(query, min_confidence=0.1):