Compare commits

...

1 commit

Author SHA1 Message Date
Mouse Reeve
c738eaa2c2 Use async requests for broadcasting
When an activity needs to be broadcast to the whole wide fediverse, the
number of requests can get enormous and the broadcast task ends up
taking ages to run. This change sends these requests out in one aiohttp
session, to improve performance.
2022-05-31 13:09:27 -07:00

View file

@ -1,4 +1,5 @@
""" activitypub model functionality """
import asyncio
from base64 import b64encode
from collections import namedtuple
from functools import reduce
@ -6,9 +7,8 @@ import json
import operator
import logging
from uuid import uuid4
import requests
from requests.exceptions import RequestException
import aiohttp
from Crypto.PublicKey import RSA
from Crypto.Signature import pkcs1_15
from Crypto.Hash import SHA256
@ -510,15 +510,22 @@ def broadcast_task(sender_id, activity, recipients):
"""the celery task for broadcast"""
user_model = apps.get_model("bookwyrm.User", require_ready=True)
sender = user_model.objects.get(id=sender_id)
for recipient in recipients:
try:
sign_and_send(sender, activity, recipient)
except RequestException:
pass
asyncio.run(async_broadcast(recipients, sender, activity))
def sign_and_send(sender, data, destination):
"""crpyto whatever and http junk"""
async def async_broadcast(recipients, sender, data):
"""Send all the broadcasts simultaneously"""
timeout = aiohttp.ClientTimeout(total=10)
async with aiohttp.ClientSession(timeout=timeout) as session:
tasks = []
for recipient in recipients:
tasks.append(
asyncio.ensure_future(sign_and_send(session, sender, data, recipient))
)
async def sign_and_send(session, sender, data, destination):
"""Sign the messages and send them in an asynchronous bundle"""
now = http_date()
if not sender.key_pair.private_key:
@ -526,21 +533,25 @@ def sign_and_send(sender, data, destination):
raise ValueError("No private key found for sender")
digest = make_digest(data)
headers = {
"Date": now,
"Digest": digest,
"Signature": make_signature(sender, destination, now, digest),
"Content-Type": "application/activity+json; charset=utf-8",
"User-Agent": USER_AGENT,
}
response = requests.post(
destination,
data=data,
headers={
"Date": now,
"Digest": digest,
"Signature": make_signature(sender, destination, now, digest),
"Content-Type": "application/activity+json; charset=utf-8",
"User-Agent": USER_AGENT,
},
)
if not response.ok:
response.raise_for_status()
return response
try:
async with session.post(destination, data=data, headers=headers) as response:
if not response.ok:
logger.exception(
"Failed to send broadcast to %s: %s", destination, response.reason
)
return await response
except asyncio.TimeoutError:
logger.info("Connection timed out for url: %s", destination)
except aiohttp.ClientError as err:
logger.exception(err)
# pylint: disable=unused-argument