Move broadcast to celery

This commit is contained in:
Mouse Reeve 2020-03-31 18:03:58 -07:00
parent c969e5550e
commit 1970682c9c
3 changed files with 27 additions and 22 deletions

View file

@ -9,6 +9,7 @@ import requests
from urllib.parse import urlparse from urllib.parse import urlparse
from fedireads import models from fedireads import models
from fedireads.tasks import app
def get_recipients(user, post_privacy, direct_recipients=None, limit=False): def get_recipients(user, post_privacy, direct_recipients=None, limit=False):
@ -59,8 +60,10 @@ def get_recipients(user, post_privacy, direct_recipients=None, limit=False):
return recipients return recipients
@app.task
def broadcast(sender, activity, recipients): def broadcast(sender, activity, recipients):
''' send out an event ''' ''' send out an event '''
sender = models.User.objects.get(id=sender)
errors = [] errors = []
for recipient in recipients: for recipient in recipients:
try: try:

View file

@ -81,7 +81,7 @@ def handle_account_search(query):
def handle_follow(user, to_follow): def handle_follow(user, to_follow):
''' someone local wants to follow someone ''' ''' someone local wants to follow someone '''
activity = activitypub.get_follow_request(user, to_follow) activity = activitypub.get_follow_request(user, to_follow)
errors = broadcast(user, activity, [to_follow.inbox]) errors = broadcast.delay(user.id, activity, [to_follow.inbox])
for error in errors: for error in errors:
raise(error['error']) raise(error['error'])
@ -93,7 +93,7 @@ def handle_unfollow(user, to_unfollow):
user_object=to_unfollow user_object=to_unfollow
) )
activity = activitypub.get_unfollow(relationship) activity = activitypub.get_unfollow(relationship)
errors = broadcast(user, activity, [to_unfollow.inbox]) errors = broadcast.delay(user.id, activity, [to_unfollow.inbox])
to_unfollow.followers.remove(user) to_unfollow.followers.remove(user)
for error in errors: for error in errors:
raise(error['error']) raise(error['error'])
@ -108,7 +108,7 @@ def handle_accept(user, to_follow, follow_request):
activity = activitypub.get_accept(to_follow, follow_request) activity = activitypub.get_accept(to_follow, follow_request)
recipient = get_recipients(to_follow, 'direct', direct_recipients=[user]) recipient = get_recipients(to_follow, 'direct', direct_recipients=[user])
broadcast(to_follow, activity, recipient) broadcast.delay(to_follow.id, activity, recipient)
def handle_reject(user, to_follow, relationship): def handle_reject(user, to_follow, relationship):
@ -117,7 +117,7 @@ def handle_reject(user, to_follow, relationship):
activity = activitypub.get_reject(to_follow, relationship) activity = activitypub.get_reject(to_follow, relationship)
recipient = get_recipients(to_follow, 'direct', direct_recipients=[user]) recipient = get_recipients(to_follow, 'direct', direct_recipients=[user])
broadcast(to_follow, activity, recipient) broadcast.delay(to_follow.id, activity, recipient)
def handle_shelve(user, book, shelf): def handle_shelve(user, book, shelf):
@ -127,7 +127,7 @@ def handle_shelve(user, book, shelf):
activity = activitypub.get_add(user, book, shelf) activity = activitypub.get_add(user, book, shelf)
recipients = get_recipients(user, 'public') recipients = get_recipients(user, 'public')
broadcast(user, activity, recipients) broadcast.delay(user.id, activity, recipients)
# tell the world about this cool thing that happened # tell the world about this cool thing that happened
verb = { verb = {
@ -143,7 +143,7 @@ def handle_shelve(user, book, shelf):
activity = activitypub.get_status(status) activity = activitypub.get_status(status)
create_activity = activitypub.get_create(user, activity) create_activity = activitypub.get_create(user, activity)
broadcast(user, create_activity, recipients) broadcast.delay(user.id, create_activity, recipients)
def handle_unshelve(user, book, shelf): def handle_unshelve(user, book, shelf):
@ -155,7 +155,7 @@ def handle_unshelve(user, book, shelf):
activity = activitypub.get_remove(user, book, shelf) activity = activitypub.get_remove(user, book, shelf)
recipients = get_recipients(user, 'public') recipients = get_recipients(user, 'public')
broadcast(user, activity, recipients) broadcast.delay(user.id, activity, recipients)
def handle_import_books(user, items): def handle_import_books(user, items):
@ -173,7 +173,7 @@ def handle_import_books(user, items):
new_books.append(item.book) new_books.append(item.book)
activity = activitypub.get_add(user, item.book, desired_shelf) activity = activitypub.get_add(user, item.book, desired_shelf)
recipients = get_recipients(user, 'public') recipients = get_recipients(user, 'public')
broadcast(user, activity, recipients) broadcast.delay(user.id, activity, recipients)
if new_books: if new_books:
message = 'imported {} books'.format(len(new_books)) message = 'imported {} books'.format(len(new_books))
@ -183,7 +183,8 @@ def handle_import_books(user, items):
create_activity = activitypub.get_create( create_activity = activitypub.get_create(
user, activitypub.get_status(status)) user, activitypub.get_status(status))
broadcast(user, create_activity, get_recipients(user, 'public')) recipients = get_recipients(user, 'public')
broadcast.delay(user.id, create_activity, recipients)
def handle_review(user, book, name, content, rating): def handle_review(user, book, name, content, rating):
@ -194,14 +195,14 @@ def handle_review(user, book, name, content, rating):
review_activity = activitypub.get_review(review) review_activity = activitypub.get_review(review)
review_create_activity = activitypub.get_create(user, review_activity) review_create_activity = activitypub.get_create(user, review_activity)
fr_recipients = get_recipients(user, 'public', limit='fedireads') fr_recipients = get_recipients(user, 'public', limit='fedireads')
broadcast(user, review_create_activity, fr_recipients) broadcast.delay(user.id, review_create_activity, fr_recipients)
# re-format the activity for non-fedireads servers # re-format the activity for non-fedireads servers
article_activity = activitypub.get_review_article(review) article_activity = activitypub.get_review_article(review)
article_create_activity = activitypub.get_create(user, article_activity) article_create_activity = activitypub.get_create(user, article_activity)
other_recipients = get_recipients(user, 'public', limit='other') other_recipients = get_recipients(user, 'public', limit='other')
broadcast(user, article_create_activity, other_recipients) broadcast.delay(user.id, article_create_activity, other_recipients)
def handle_comment(user, book, name, content): def handle_comment(user, book, name, content):
@ -212,14 +213,14 @@ def handle_comment(user, book, name, content):
comment_activity = activitypub.get_comment(comment) comment_activity = activitypub.get_comment(comment)
comment_create_activity = activitypub.get_create(user, comment_activity) comment_create_activity = activitypub.get_create(user, comment_activity)
fr_recipients = get_recipients(user, 'public', limit='fedireads') fr_recipients = get_recipients(user, 'public', limit='fedireads')
broadcast(user, comment_create_activity, fr_recipients) broadcast.delay(user.id, comment_create_activity, fr_recipients)
# re-format the activity for non-fedireads servers # re-format the activity for non-fedireads servers
article_activity = activitypub.get_comment_article(comment) article_activity = activitypub.get_comment_article(comment)
article_create_activity = activitypub.get_create(user, article_activity) article_create_activity = activitypub.get_create(user, article_activity)
other_recipients = get_recipients(user, 'public', limit='other') other_recipients = get_recipients(user, 'public', limit='other')
broadcast(user, article_create_activity, other_recipients) broadcast.delay(user.id, article_create_activity, other_recipients)
def handle_tag(user, book, name): def handle_tag(user, book, name):
@ -228,7 +229,7 @@ def handle_tag(user, book, name):
tag_activity = activitypub.get_add_tag(tag) tag_activity = activitypub.get_add_tag(tag)
recipients = get_recipients(user, 'public') recipients = get_recipients(user, 'public')
broadcast(user, tag_activity, recipients) broadcast.delay(user.id, tag_activity, recipients)
def handle_untag(user, book, name): def handle_untag(user, book, name):
@ -239,7 +240,7 @@ def handle_untag(user, book, name):
tag.delete() tag.delete()
recipients = get_recipients(user, 'public') recipients = get_recipients(user, 'public')
broadcast(user, tag_activity, recipients) broadcast.delay(user.id, tag_activity, recipients)
def handle_reply(user, review, content): def handle_reply(user, review, content):
@ -257,7 +258,7 @@ def handle_reply(user, review, content):
create_activity = activitypub.get_create(user, reply_activity) create_activity = activitypub.get_create(user, reply_activity)
recipients = get_recipients(user, 'public') recipients = get_recipients(user, 'public')
broadcast(user, create_activity, recipients) broadcast.delay(user.id, create_activity, recipients)
def handle_favorite(user, status): def handle_favorite(user, status):
@ -273,7 +274,7 @@ def handle_favorite(user, status):
fav_activity = activitypub.get_favorite(favorite) fav_activity = activitypub.get_favorite(favorite)
recipients = get_recipients(user, 'direct', [status.user]) recipients = get_recipients(user, 'direct', [status.user])
broadcast(user, fav_activity, recipients) broadcast.delay(user.id, fav_activity, recipients)
def handle_unfavorite(user, status): def handle_unfavorite(user, status):
@ -289,7 +290,7 @@ def handle_unfavorite(user, status):
fav_activity = activitypub.get_unfavorite(favorite) fav_activity = activitypub.get_unfavorite(favorite)
recipients = get_recipients(user, 'direct', [status.user]) recipients = get_recipients(user, 'direct', [status.user])
broadcast(user, fav_activity, recipients) broadcast.delay(user.id, fav_activity, recipients)
def handle_boost(user, status): def handle_boost(user, status):
''' a user wishes to boost a status ''' ''' a user wishes to boost a status '''
@ -305,14 +306,14 @@ def handle_boost(user, status):
boost_activity = activitypub.get_boost(boost) boost_activity = activitypub.get_boost(boost)
recipients = get_recipients(user, 'public') recipients = get_recipients(user, 'public')
broadcast(user, boost_activity, recipients) broadcast.delay(user.id, boost_activity, recipients)
def handle_update_book(user, book): def handle_update_book(user, book):
''' broadcast the news about our book ''' ''' broadcast the news about our book '''
book_activity = activitypub.get_book(book) book_activity = activitypub.get_book(book)
update_activity = activitypub.get_update(user, book_activity) update_activity = activitypub.get_update(user, book_activity)
recipients = get_recipients(None, 'public') recipients = get_recipients(None, 'public')
broadcast(user, update_activity, recipients) broadcast.delay(user.id, update_activity, recipients)
def handle_update_user(user): def handle_update_user(user):
@ -320,5 +321,5 @@ def handle_update_user(user):
actor = activitypub.get_actor(user) actor = activitypub.get_actor(user)
update_activity = activitypub.get_update(user, actor) update_activity = activitypub.get_update(user, actor)
recipients = get_recipients(user, 'public') recipients = get_recipients(user, 'public')
broadcast(user, update_activity, recipients) broadcast.delay(user.id, update_activity, recipients)

View file

@ -18,5 +18,6 @@ app.config_from_object('django.conf:settings', namespace='CELERY')
# Load task modules from all registered Django app configs. # Load task modules from all registered Django app configs.
app.autodiscover_tasks() app.autodiscover_tasks()
app.autodiscover_tasks(['fedireads'], related_name="incoming") app.autodiscover_tasks(['fedireads'], related_name='incoming')
app.autodiscover_tasks(['fedireads'], related_name='broadcast')