Merge pull request #2143 from bookwyrm-social/async_broadcast

Use async requests for broadcasting
This commit is contained in:
Mouse Reeve 2022-07-02 10:23:02 -07:00 committed by GitHub
commit be76fe1494
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -1,4 +1,5 @@
""" activitypub model functionality """
import asyncio
from base64 import b64encode
from collections import namedtuple
from functools import reduce
@ -6,9 +7,8 @@ import json
import operator
import logging
from uuid import uuid4
import requests
from requests.exceptions import RequestException
import aiohttp
from Crypto.PublicKey import RSA
from Crypto.Signature import pkcs1_15
from Crypto.Hash import SHA256
@ -510,15 +510,22 @@ def broadcast_task(sender_id, activity, recipients):
"""the celery task for broadcast"""
user_model = apps.get_model("bookwyrm.User", require_ready=True)
sender = user_model.objects.get(id=sender_id)
for recipient in recipients:
try:
sign_and_send(sender, activity, recipient)
except RequestException:
pass
asyncio.run(async_broadcast(recipients, sender, activity))
def sign_and_send(sender, data, destination):
"""crpyto whatever and http junk"""
async def async_broadcast(recipients, sender, data):
"""Send all the broadcasts simultaneously"""
timeout = aiohttp.ClientTimeout(total=10)
async with aiohttp.ClientSession(timeout=timeout) as session:
tasks = []
for recipient in recipients:
tasks.append(
asyncio.ensure_future(sign_and_send(session, sender, data, recipient))
)
async def sign_and_send(session, sender, data, destination):
"""Sign the messages and send them in an asynchronous bundle"""
now = http_date()
if not sender.key_pair.private_key:
@ -526,21 +533,25 @@ def sign_and_send(sender, data, destination):
raise ValueError("No private key found for sender")
digest = make_digest(data)
headers = {
"Date": now,
"Digest": digest,
"Signature": make_signature(sender, destination, now, digest),
"Content-Type": "application/activity+json; charset=utf-8",
"User-Agent": USER_AGENT,
}
response = requests.post(
destination,
data=data,
headers={
"Date": now,
"Digest": digest,
"Signature": make_signature(sender, destination, now, digest),
"Content-Type": "application/activity+json; charset=utf-8",
"User-Agent": USER_AGENT,
},
)
if not response.ok:
response.raise_for_status()
return response
try:
async with session.post(destination, data=data, headers=headers) as response:
if not response.ok:
logger.exception(
"Failed to send broadcast to %s: %s", destination, response.reason
)
return await response
except asyncio.TimeoutError:
logger.info("Connection timed out for url: %s", destination)
except aiohttp.ClientError as err:
logger.exception(err)
# pylint: disable=unused-argument