Move activitystreams updates to tasks

This commit is contained in:
Mouse Reeve 2021-08-04 19:54:47 -07:00
parent 699d9307e3
commit 1fbca70820
2 changed files with 90 additions and 27 deletions

View file

@ -1,9 +1,11 @@
""" access the activity streams stored in redis """ """ access the activity streams stored in redis """
from django.dispatch import receiver from django.dispatch import receiver
from django.db import transaction
from django.db.models import signals, Q from django.db.models import signals, Q
from bookwyrm import models from bookwyrm import models
from bookwyrm.redis_store import RedisStore, r from bookwyrm.redis_store import RedisStore, r
from bookwyrm.tasks import app
from bookwyrm.views.helpers import privacy_filter from bookwyrm.views.helpers import privacy_filter
@ -190,16 +192,20 @@ def add_status_on_create(sender, instance, created, *args, **kwargs):
return return
if instance.deleted: if instance.deleted:
for stream in streams.values(): remove_status_task.delay(instance.id)
stream.remove_object_from_related_stores(instance)
return return
if not created: if not created:
return return
# when creating new things, gotta wait on the transaction
transaction.on_commit(lambda: add_status_on_create_command(sender, instance))
def add_status_on_create_command(sender, instance):
"""runs this code only after the database commit completes"""
# iterates through Home, Local, Federated # iterates through Home, Local, Federated
for stream in streams.values(): add_status_task.delay(instance.id)
stream.add_status(instance)
if sender != models.Boost: if sender != models.Boost:
return return
@ -208,23 +214,19 @@ def add_status_on_create(sender, instance, created, *args, **kwargs):
old_versions = models.Boost.objects.filter( old_versions = models.Boost.objects.filter(
boosted_status__id=boosted.id, boosted_status__id=boosted.id,
created_date__lt=instance.created_date, created_date__lt=instance.created_date,
) ).values_list("id", flat=True)
for stream in streams.values(): remove_status_task.delay(boosted.id)
stream.remove_object_from_related_stores(boosted) remove_status_task.delay(old_versions)
for status in old_versions:
stream.remove_object_from_related_stores(status)
@receiver(signals.post_delete, sender=models.Boost) @receiver(signals.post_delete, sender=models.Boost)
# pylint: disable=unused-argument # pylint: disable=unused-argument
def remove_boost_on_delete(sender, instance, *args, **kwargs): def remove_boost_on_delete(sender, instance, *args, **kwargs):
"""boosts are deleted""" """boosts are deleted"""
# we're only interested in new statuses # remove the boost
for stream in streams.values(): remove_status_task.delay(instance.id)
# remove the boost # re-add the original status
stream.remove_object_from_related_stores(instance) add_status_task.delay(instance.boosted_status.id)
# re-add the original status
stream.add_status(instance.boosted_status)
@receiver(signals.post_save, sender=models.UserFollows) @receiver(signals.post_save, sender=models.UserFollows)
@ -233,7 +235,9 @@ def add_statuses_on_follow(sender, instance, created, *args, **kwargs):
"""add a newly followed user's statuses to feeds""" """add a newly followed user's statuses to feeds"""
if not created or not instance.user_subject.local: if not created or not instance.user_subject.local:
return return
HomeStream().add_user_statuses(instance.user_subject, instance.user_object) add_user_statuses_task.delay(
instance.user_subject.id, instance.user_object.id, stream_list=["home"]
)
@receiver(signals.post_delete, sender=models.UserFollows) @receiver(signals.post_delete, sender=models.UserFollows)
@ -242,7 +246,9 @@ def remove_statuses_on_unfollow(sender, instance, *args, **kwargs):
"""remove statuses from a feed on unfollow""" """remove statuses from a feed on unfollow"""
if not instance.user_subject.local: if not instance.user_subject.local:
return return
HomeStream().remove_user_statuses(instance.user_subject, instance.user_object) remove_user_statuses_task.delay(
instance.user_subject.id, instance.user_object.id, stream_list=["home"]
)
@receiver(signals.post_save, sender=models.UserBlocks) @receiver(signals.post_save, sender=models.UserBlocks)
@ -251,29 +257,36 @@ def remove_statuses_on_block(sender, instance, *args, **kwargs):
"""remove statuses from all feeds on block""" """remove statuses from all feeds on block"""
# blocks apply ot all feeds # blocks apply ot all feeds
if instance.user_subject.local: if instance.user_subject.local:
for stream in streams.values(): remove_user_statuses_task.delay(
stream.remove_user_statuses(instance.user_subject, instance.user_object) instance.user_subject.id, instance.user_object.id
)
# and in both directions # and in both directions
if instance.user_object.local: if instance.user_object.local:
for stream in streams.values(): remove_user_statuses_task.delay(
stream.remove_user_statuses(instance.user_object, instance.user_subject) instance.user_object.id, instance.user_subject.id
)
@receiver(signals.post_delete, sender=models.UserBlocks) @receiver(signals.post_delete, sender=models.UserBlocks)
# pylint: disable=unused-argument # pylint: disable=unused-argument
def add_statuses_on_unblock(sender, instance, *args, **kwargs): def add_statuses_on_unblock(sender, instance, *args, **kwargs):
"""remove statuses from all feeds on block""" """remove statuses from all feeds on block"""
public_streams = [LocalStream(), FederatedStream()]
# add statuses back to streams with statuses from anyone # add statuses back to streams with statuses from anyone
if instance.user_subject.local: if instance.user_subject.local:
for stream in public_streams: add_user_statuses_task.delay(
stream.add_user_statuses(instance.user_subject, instance.user_object) instance.user_subject.id,
instance.user_object.id,
stream_list=["local", "federated"],
)
# add statuses back to streams with statuses from anyone # add statuses back to streams with statuses from anyone
if instance.user_object.local: if instance.user_object.local:
for stream in public_streams: add_user_statuses_task.delay(
stream.add_user_statuses(instance.user_object, instance.user_subject) instance.user_object.id,
instance.user_subject.id,
stream_list=["local", "federated"],
)
@receiver(signals.post_save, sender=models.User) @receiver(signals.post_save, sender=models.User)
@ -283,5 +296,54 @@ def populate_streams_on_account_create(sender, instance, created, *args, **kwarg
if not created or not instance.local: if not created or not instance.local:
return return
populate_streams_task.delay(instance.id)
# ---- TASKS
@app.task
def populate_streams_task(user_id):
"""create a user's streams"""
user = models.User.objects.get(id=user_id)
for stream in streams.values(): for stream in streams.values():
stream.populate_streams(instance) stream.populate_streams(user)
@app.task
def remove_status_task(status_ids):
"""remove a status from any stream it might be in"""
# this can take an id or a list of ids
if not isinstance(status_ids, list):
status_ids = [status_ids]
statuses = models.Status.objects.filter(id__in=status_ids)
for stream in streams.values():
for status in statuses:
stream.remove_object_from_related_stores(status)
@app.task
def add_status_task(status_id):
"""remove a status from any stream it might be in"""
status = models.Status.objects.get(id=status_id)
for stream in streams.values():
stream.add_status(status)
@app.task
def remove_user_statuses_task(viewer_id, user_id, stream_list=None):
"""remove all statuses by a user from a viewer's stream"""
stream_list = [streams[s] for s in stream_list] if stream_list else streams.values()
viewer = models.User.objects.get(id=viewer_id)
user = models.User.objects.get(id=user_id)
for stream in stream_list:
stream.remove_user_statuses(viewer, user)
@app.task
def add_user_statuses_task(viewer_id, user_id, stream_list=None):
"""remove all statuses by a user from a viewer's stream"""
stream_list = [streams[s] for s in stream_list] if stream_list else streams.values()
viewer = models.User.objects.get(id=viewer_id)
user = models.User.objects.get(id=user_id)
for stream in stream_list:
stream.add_user_statuses(viewer, user)

View file

@ -20,6 +20,7 @@ app.config_from_object("django.conf:settings", namespace="CELERY")
# Load task modules from all registered Django app configs. # Load task modules from all registered Django app configs.
app.autodiscover_tasks() app.autodiscover_tasks()
app.autodiscover_tasks(["bookwyrm"], related_name="activitypub.base_activity") app.autodiscover_tasks(["bookwyrm"], related_name="activitypub.base_activity")
app.autodiscover_tasks(["bookwyrm"], related_name="activitystreams")
app.autodiscover_tasks(["bookwyrm"], related_name="broadcast") app.autodiscover_tasks(["bookwyrm"], related_name="broadcast")
app.autodiscover_tasks(["bookwyrm"], related_name="connectors.abstract_connector") app.autodiscover_tasks(["bookwyrm"], related_name="connectors.abstract_connector")
app.autodiscover_tasks(["bookwyrm"], related_name="emailing") app.autodiscover_tasks(["bookwyrm"], related_name="emailing")