Use async requests for broadcasting

When an activity needs to be broadcast to the whole wide fediverse, the
number of requests can get enormous and the broadcast task ends up
taking ages to run. This change sends these requests out in one aiohttp
session, to improve performance.
This commit is contained in:
Mouse Reeve 2022-05-31 13:09:27 -07:00
parent 355e7039f0
commit c738eaa2c2

View file

@ -1,4 +1,5 @@
""" activitypub model functionality """ """ activitypub model functionality """
import asyncio
from base64 import b64encode from base64 import b64encode
from collections import namedtuple from collections import namedtuple
from functools import reduce from functools import reduce
@ -6,9 +7,8 @@ import json
import operator import operator
import logging import logging
from uuid import uuid4 from uuid import uuid4
import requests
from requests.exceptions import RequestException
import aiohttp
from Crypto.PublicKey import RSA from Crypto.PublicKey import RSA
from Crypto.Signature import pkcs1_15 from Crypto.Signature import pkcs1_15
from Crypto.Hash import SHA256 from Crypto.Hash import SHA256
@ -510,15 +510,22 @@ def broadcast_task(sender_id, activity, recipients):
"""the celery task for broadcast""" """the celery task for broadcast"""
user_model = apps.get_model("bookwyrm.User", require_ready=True) user_model = apps.get_model("bookwyrm.User", require_ready=True)
sender = user_model.objects.get(id=sender_id) sender = user_model.objects.get(id=sender_id)
for recipient in recipients: asyncio.run(async_broadcast(recipients, sender, activity))
try:
sign_and_send(sender, activity, recipient)
except RequestException:
pass
def sign_and_send(sender, data, destination): async def async_broadcast(recipients, sender, data):
"""crpyto whatever and http junk""" """Send all the broadcasts simultaneously"""
timeout = aiohttp.ClientTimeout(total=10)
async with aiohttp.ClientSession(timeout=timeout) as session:
tasks = []
for recipient in recipients:
tasks.append(
asyncio.ensure_future(sign_and_send(session, sender, data, recipient))
)
async def sign_and_send(session, sender, data, destination):
"""Sign the messages and send them in an asynchronous bundle"""
now = http_date() now = http_date()
if not sender.key_pair.private_key: if not sender.key_pair.private_key:
@ -526,21 +533,25 @@ def sign_and_send(sender, data, destination):
raise ValueError("No private key found for sender") raise ValueError("No private key found for sender")
digest = make_digest(data) digest = make_digest(data)
headers = {
"Date": now,
"Digest": digest,
"Signature": make_signature(sender, destination, now, digest),
"Content-Type": "application/activity+json; charset=utf-8",
"User-Agent": USER_AGENT,
}
response = requests.post( try:
destination, async with session.post(destination, data=data, headers=headers) as response:
data=data, if not response.ok:
headers={ logger.exception(
"Date": now, "Failed to send broadcast to %s: %s", destination, response.reason
"Digest": digest, )
"Signature": make_signature(sender, destination, now, digest), return await response
"Content-Type": "application/activity+json; charset=utf-8", except asyncio.TimeoutError:
"User-Agent": USER_AGENT, logger.info("Connection timed out for url: %s", destination)
}, except aiohttp.ClientError as err:
) logger.exception(err)
if not response.ok:
response.raise_for_status()
return response
# pylint: disable=unused-argument # pylint: disable=unused-argument