Merge pull request #2163 from bookwyrm-social/revert-2143-async_broadcast

Revert "Use async requests for broadcasting"
This commit is contained in:
Mouse Reeve 2022-07-02 11:11:48 -07:00 committed by GitHub
commit 43f3d69821
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -1,5 +1,4 @@
""" activitypub model functionality """ """ activitypub model functionality """
import asyncio
from base64 import b64encode from base64 import b64encode
from collections import namedtuple from collections import namedtuple
from functools import reduce from functools import reduce
@ -7,8 +6,9 @@ import json
import operator import operator
import logging import logging
from uuid import uuid4 from uuid import uuid4
import requests
from requests.exceptions import RequestException
import aiohttp
from Crypto.PublicKey import RSA from Crypto.PublicKey import RSA
from Crypto.Signature import pkcs1_15 from Crypto.Signature import pkcs1_15
from Crypto.Hash import SHA256 from Crypto.Hash import SHA256
@ -510,22 +510,15 @@ def broadcast_task(sender_id, activity, recipients):
"""the celery task for broadcast""" """the celery task for broadcast"""
user_model = apps.get_model("bookwyrm.User", require_ready=True) user_model = apps.get_model("bookwyrm.User", require_ready=True)
sender = user_model.objects.get(id=sender_id) sender = user_model.objects.get(id=sender_id)
asyncio.run(async_broadcast(recipients, sender, activity))
async def async_broadcast(recipients, sender, data):
"""Send all the broadcasts simultaneously"""
timeout = aiohttp.ClientTimeout(total=10)
async with aiohttp.ClientSession(timeout=timeout) as session:
tasks = []
for recipient in recipients: for recipient in recipients:
tasks.append( try:
asyncio.ensure_future(sign_and_send(session, sender, data, recipient)) sign_and_send(sender, activity, recipient)
) except RequestException:
pass
async def sign_and_send(session, sender, data, destination): def sign_and_send(sender, data, destination):
"""Sign the messages and send them in an asynchronous bundle""" """crpyto whatever and http junk"""
now = http_date() now = http_date()
if not sender.key_pair.private_key: if not sender.key_pair.private_key:
@ -533,25 +526,21 @@ async def sign_and_send(session, sender, data, destination):
raise ValueError("No private key found for sender") raise ValueError("No private key found for sender")
digest = make_digest(data) digest = make_digest(data)
headers = {
response = requests.post(
destination,
data=data,
headers={
"Date": now, "Date": now,
"Digest": digest, "Digest": digest,
"Signature": make_signature(sender, destination, now, digest), "Signature": make_signature(sender, destination, now, digest),
"Content-Type": "application/activity+json; charset=utf-8", "Content-Type": "application/activity+json; charset=utf-8",
"User-Agent": USER_AGENT, "User-Agent": USER_AGENT,
} },
try:
async with session.post(destination, data=data, headers=headers) as response:
if not response.ok:
logger.exception(
"Failed to send broadcast to %s: %s", destination, response.reason
) )
return await response if not response.ok:
except asyncio.TimeoutError: response.raise_for_status()
logger.info("Connection timed out for url: %s", destination) return response
except aiohttp.ClientError as err:
logger.exception(err)
# pylint: disable=unused-argument # pylint: disable=unused-argument