forked from mirrors/bookwyrm
Compare commits
1 commit
main
...
async_broa
Author | SHA1 | Date | |
---|---|---|---|
|
c738eaa2c2 |
1 changed files with 34 additions and 23 deletions
|
@ -1,4 +1,5 @@
|
|||
""" activitypub model functionality """
|
||||
import asyncio
|
||||
from base64 import b64encode
|
||||
from collections import namedtuple
|
||||
from functools import reduce
|
||||
|
@ -6,9 +7,8 @@ import json
|
|||
import operator
|
||||
import logging
|
||||
from uuid import uuid4
|
||||
import requests
|
||||
from requests.exceptions import RequestException
|
||||
|
||||
import aiohttp
|
||||
from Crypto.PublicKey import RSA
|
||||
from Crypto.Signature import pkcs1_15
|
||||
from Crypto.Hash import SHA256
|
||||
|
@ -510,15 +510,22 @@ def broadcast_task(sender_id, activity, recipients):
|
|||
"""the celery task for broadcast"""
|
||||
user_model = apps.get_model("bookwyrm.User", require_ready=True)
|
||||
sender = user_model.objects.get(id=sender_id)
|
||||
for recipient in recipients:
|
||||
try:
|
||||
sign_and_send(sender, activity, recipient)
|
||||
except RequestException:
|
||||
pass
|
||||
asyncio.run(async_broadcast(recipients, sender, activity))
|
||||
|
||||
|
||||
def sign_and_send(sender, data, destination):
|
||||
"""crpyto whatever and http junk"""
|
||||
async def async_broadcast(recipients, sender, data):
|
||||
"""Send all the broadcasts simultaneously"""
|
||||
timeout = aiohttp.ClientTimeout(total=10)
|
||||
async with aiohttp.ClientSession(timeout=timeout) as session:
|
||||
tasks = []
|
||||
for recipient in recipients:
|
||||
tasks.append(
|
||||
asyncio.ensure_future(sign_and_send(session, sender, data, recipient))
|
||||
)
|
||||
|
||||
|
||||
async def sign_and_send(session, sender, data, destination):
|
||||
"""Sign the messages and send them in an asynchronous bundle"""
|
||||
now = http_date()
|
||||
|
||||
if not sender.key_pair.private_key:
|
||||
|
@ -526,21 +533,25 @@ def sign_and_send(sender, data, destination):
|
|||
raise ValueError("No private key found for sender")
|
||||
|
||||
digest = make_digest(data)
|
||||
headers = {
|
||||
"Date": now,
|
||||
"Digest": digest,
|
||||
"Signature": make_signature(sender, destination, now, digest),
|
||||
"Content-Type": "application/activity+json; charset=utf-8",
|
||||
"User-Agent": USER_AGENT,
|
||||
}
|
||||
|
||||
response = requests.post(
|
||||
destination,
|
||||
data=data,
|
||||
headers={
|
||||
"Date": now,
|
||||
"Digest": digest,
|
||||
"Signature": make_signature(sender, destination, now, digest),
|
||||
"Content-Type": "application/activity+json; charset=utf-8",
|
||||
"User-Agent": USER_AGENT,
|
||||
},
|
||||
)
|
||||
if not response.ok:
|
||||
response.raise_for_status()
|
||||
return response
|
||||
try:
|
||||
async with session.post(destination, data=data, headers=headers) as response:
|
||||
if not response.ok:
|
||||
logger.exception(
|
||||
"Failed to send broadcast to %s: %s", destination, response.reason
|
||||
)
|
||||
return await response
|
||||
except asyncio.TimeoutError:
|
||||
logger.info("Connection timed out for url: %s", destination)
|
||||
except aiohttp.ClientError as err:
|
||||
logger.exception(err)
|
||||
|
||||
|
||||
# pylint: disable=unused-argument
|
||||
|
|
Loading…
Reference in a new issue