From c738eaa2c2a04f8da4886bb4198063952891e8a3 Mon Sep 17 00:00:00 2001 From: Mouse Reeve Date: Tue, 31 May 2022 13:09:27 -0700 Subject: [PATCH] Use async requests for broadcasting When an activity needs to be broadcast to the whole wide fediverse, the number of requests can get enormous and the broadcast task ends up taking ages to run. This change sends these requests out in one aiohttp session, to improve performance. --- bookwyrm/models/activitypub_mixin.py | 57 +++++++++++++++++----------- 1 file changed, 34 insertions(+), 23 deletions(-) diff --git a/bookwyrm/models/activitypub_mixin.py b/bookwyrm/models/activitypub_mixin.py index 402cb040b..56f9da208 100644 --- a/bookwyrm/models/activitypub_mixin.py +++ b/bookwyrm/models/activitypub_mixin.py @@ -1,4 +1,5 @@ """ activitypub model functionality """ +import asyncio from base64 import b64encode from collections import namedtuple from functools import reduce @@ -6,9 +7,8 @@ import json import operator import logging from uuid import uuid4 -import requests -from requests.exceptions import RequestException +import aiohttp from Crypto.PublicKey import RSA from Crypto.Signature import pkcs1_15 from Crypto.Hash import SHA256 @@ -510,15 +510,22 @@ def broadcast_task(sender_id, activity, recipients): """the celery task for broadcast""" user_model = apps.get_model("bookwyrm.User", require_ready=True) sender = user_model.objects.get(id=sender_id) - for recipient in recipients: - try: - sign_and_send(sender, activity, recipient) - except RequestException: - pass + asyncio.run(async_broadcast(recipients, sender, activity)) -def sign_and_send(sender, data, destination): - """crpyto whatever and http junk""" +async def async_broadcast(recipients, sender, data): + """Send all the broadcasts simultaneously""" + timeout = aiohttp.ClientTimeout(total=10) + async with aiohttp.ClientSession(timeout=timeout) as session: + tasks = [] + for recipient in recipients: + tasks.append( + asyncio.ensure_future(sign_and_send(session, sender, data, recipient)) + ) + + +async def sign_and_send(session, sender, data, destination): + """Sign the messages and send them in an asynchronous bundle""" now = http_date() if not sender.key_pair.private_key: @@ -526,21 +533,25 @@ def sign_and_send(sender, data, destination): raise ValueError("No private key found for sender") digest = make_digest(data) + headers = { + "Date": now, + "Digest": digest, + "Signature": make_signature(sender, destination, now, digest), + "Content-Type": "application/activity+json; charset=utf-8", + "User-Agent": USER_AGENT, + } - response = requests.post( - destination, - data=data, - headers={ - "Date": now, - "Digest": digest, - "Signature": make_signature(sender, destination, now, digest), - "Content-Type": "application/activity+json; charset=utf-8", - "User-Agent": USER_AGENT, - }, - ) - if not response.ok: - response.raise_for_status() - return response + try: + async with session.post(destination, data=data, headers=headers) as response: + if not response.ok: + logger.exception( + "Failed to send broadcast to %s: %s", destination, response.reason + ) + return await response + except asyncio.TimeoutError: + logger.info("Connection timed out for url: %s", destination) + except aiohttp.ClientError as err: + logger.exception(err) # pylint: disable=unused-argument