diff --git a/bookwyrm/models/activitypub_mixin.py b/bookwyrm/models/activitypub_mixin.py index 402cb040..56f9da20 100644 --- a/bookwyrm/models/activitypub_mixin.py +++ b/bookwyrm/models/activitypub_mixin.py @@ -1,4 +1,5 @@ """ activitypub model functionality """ +import asyncio from base64 import b64encode from collections import namedtuple from functools import reduce @@ -6,9 +7,8 @@ import json import operator import logging from uuid import uuid4 -import requests -from requests.exceptions import RequestException +import aiohttp from Crypto.PublicKey import RSA from Crypto.Signature import pkcs1_15 from Crypto.Hash import SHA256 @@ -510,15 +510,22 @@ def broadcast_task(sender_id, activity, recipients): """the celery task for broadcast""" user_model = apps.get_model("bookwyrm.User", require_ready=True) sender = user_model.objects.get(id=sender_id) - for recipient in recipients: - try: - sign_and_send(sender, activity, recipient) - except RequestException: - pass + asyncio.run(async_broadcast(recipients, sender, activity)) -def sign_and_send(sender, data, destination): - """crpyto whatever and http junk""" +async def async_broadcast(recipients, sender, data): + """Send all the broadcasts simultaneously""" + timeout = aiohttp.ClientTimeout(total=10) + async with aiohttp.ClientSession(timeout=timeout) as session: + tasks = [] + for recipient in recipients: + tasks.append( + asyncio.ensure_future(sign_and_send(session, sender, data, recipient)) + ) + + +async def sign_and_send(session, sender, data, destination): + """Sign the messages and send them in an asynchronous bundle""" now = http_date() if not sender.key_pair.private_key: @@ -526,21 +533,25 @@ def sign_and_send(sender, data, destination): raise ValueError("No private key found for sender") digest = make_digest(data) + headers = { + "Date": now, + "Digest": digest, + "Signature": make_signature(sender, destination, now, digest), + "Content-Type": "application/activity+json; charset=utf-8", + "User-Agent": USER_AGENT, + } - response = requests.post( - destination, - data=data, - headers={ - "Date": now, - "Digest": digest, - "Signature": make_signature(sender, destination, now, digest), - "Content-Type": "application/activity+json; charset=utf-8", - "User-Agent": USER_AGENT, - }, - ) - if not response.ok: - response.raise_for_status() - return response + try: + async with session.post(destination, data=data, headers=headers) as response: + if not response.ok: + logger.exception( + "Failed to send broadcast to %s: %s", destination, response.reason + ) + return await response + except asyncio.TimeoutError: + logger.info("Connection timed out for url: %s", destination) + except aiohttp.ClientError as err: + logger.exception(err) # pylint: disable=unused-argument