Adds enum for queue names

This commit is contained in:
Mouse Reeve 2021-11-11 19:10:22 -08:00
parent 6aa57d4d34
commit 9fee860b00
2 changed files with 19 additions and 13 deletions

View file

@ -20,7 +20,7 @@ from django.utils.http import http_date
from bookwyrm import activitypub from bookwyrm import activitypub
from bookwyrm.settings import USER_AGENT, PAGE_LENGTH from bookwyrm.settings import USER_AGENT, PAGE_LENGTH
from bookwyrm.signatures import make_signature, make_digest from bookwyrm.signatures import make_signature, make_digest
from bookwyrm.tasks import app from bookwyrm.tasks import app, MEDIUM
from bookwyrm.models.fields import ImageField, ManyToManyField from bookwyrm.models.fields import ImageField, ManyToManyField
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -29,7 +29,6 @@ logger = logging.getLogger(__name__)
PropertyField = namedtuple("PropertyField", ("set_activity_from_field")) PropertyField = namedtuple("PropertyField", ("set_activity_from_field"))
# pylint: disable=invalid-name # pylint: disable=invalid-name
def set_activity_from_property_field(activity, obj, field): def set_activity_from_property_field(activity, obj, field):
"""assign a model property value to the activity json""" """assign a model property value to the activity json"""
@ -126,7 +125,7 @@ class ActivitypubMixin:
# there OUGHT to be only one match # there OUGHT to be only one match
return match.first() return match.first()
def broadcast(self, activity, sender, software=None, queue="medium_priority"): def broadcast(self, activity, sender, software=None, queue=MEDIUM):
"""send out an activity""" """send out an activity"""
broadcast_task.apply_async( broadcast_task.apply_async(
args=( args=(
@ -198,7 +197,7 @@ class ActivitypubMixin:
class ObjectMixin(ActivitypubMixin): class ObjectMixin(ActivitypubMixin):
"""add this mixin for object models that are AP serializable""" """add this mixin for object models that are AP serializable"""
def save(self, *args, created=None, software=None, **kwargs): def save(self, *args, created=None, software=None, priority=MEDIUM, **kwargs):
"""broadcast created/updated/deleted objects as appropriate""" """broadcast created/updated/deleted objects as appropriate"""
broadcast = kwargs.get("broadcast", True) broadcast = kwargs.get("broadcast", True)
# this bonus kwarg would cause an error in the base save method # this bonus kwarg would cause an error in the base save method
@ -225,12 +224,14 @@ class ObjectMixin(ActivitypubMixin):
# do we have a "pure" activitypub version of this for mastodon? # do we have a "pure" activitypub version of this for mastodon?
if software != "bookwyrm" and hasattr(self, "pure_content"): if software != "bookwyrm" and hasattr(self, "pure_content"):
pure_activity = self.to_create_activity(user, pure=True) pure_activity = self.to_create_activity(user, pure=True)
self.broadcast(pure_activity, user, software="other") self.broadcast(
pure_activity, user, software="other", queue=priority
)
# set bookwyrm so that that type is also sent # set bookwyrm so that that type is also sent
software = "bookwyrm" software = "bookwyrm"
# sends to BW only if we just did a pure version for masto # sends to BW only if we just did a pure version for masto
activity = self.to_create_activity(user) activity = self.to_create_activity(user)
self.broadcast(activity, user, software=software) self.broadcast(activity, user, software=software, queue=priority)
except AttributeError: except AttributeError:
# janky as heck, this catches the mutliple inheritence chain # janky as heck, this catches the mutliple inheritence chain
# for boosts and ignores this auxilliary broadcast # for boosts and ignores this auxilliary broadcast
@ -254,7 +255,7 @@ class ObjectMixin(ActivitypubMixin):
activity = self.to_delete_activity(user) activity = self.to_delete_activity(user)
else: else:
activity = self.to_update_activity(user) activity = self.to_update_activity(user)
self.broadcast(activity, user) self.broadcast(activity, user, queue=priority)
def to_create_activity(self, user, **kwargs): def to_create_activity(self, user, **kwargs):
"""returns the object wrapped in a Create activity""" """returns the object wrapped in a Create activity"""
@ -377,7 +378,7 @@ class CollectionItemMixin(ActivitypubMixin):
activity_serializer = activitypub.CollectionItem activity_serializer = activitypub.CollectionItem
def broadcast(self, activity, sender, software="bookwyrm", queue="medium_priority"): def broadcast(self, activity, sender, software="bookwyrm", queue=MEDIUM):
"""only send book collection updates to other bookwyrm instances""" """only send book collection updates to other bookwyrm instances"""
super().broadcast(activity, sender, software=software, queue=queue) super().broadcast(activity, sender, software=software, queue=queue)
@ -398,7 +399,7 @@ class CollectionItemMixin(ActivitypubMixin):
return [] return []
return [collection_field.user] return [collection_field.user]
def save(self, *args, broadcast=True, **kwargs): def save(self, *args, broadcast=True, priority=MEDIUM, **kwargs):
"""broadcast updated""" """broadcast updated"""
# first off, we want to save normally no matter what # first off, we want to save normally no matter what
super().save(*args, **kwargs) super().save(*args, **kwargs)
@ -409,7 +410,7 @@ class CollectionItemMixin(ActivitypubMixin):
# adding an obj to the collection # adding an obj to the collection
activity = self.to_add_activity(self.user) activity = self.to_add_activity(self.user)
self.broadcast(activity, self.user, queue="low_priority") self.broadcast(activity, self.user, queue=priority)
def delete(self, *args, broadcast=True, **kwargs): def delete(self, *args, broadcast=True, **kwargs):
"""broadcast a remove activity""" """broadcast a remove activity"""
@ -442,12 +443,12 @@ class CollectionItemMixin(ActivitypubMixin):
class ActivityMixin(ActivitypubMixin): class ActivityMixin(ActivitypubMixin):
"""add this mixin for models that are AP serializable""" """add this mixin for models that are AP serializable"""
def save(self, *args, broadcast=True, **kwargs): def save(self, *args, broadcast=True, priority=MEDIUM, **kwargs):
"""broadcast activity""" """broadcast activity"""
super().save(*args, **kwargs) super().save(*args, **kwargs)
user = self.user if hasattr(self, "user") else self.user_subject user = self.user if hasattr(self, "user") else self.user_subject
if broadcast and user.local: if broadcast and user.local:
self.broadcast(self.to_activity(), user) self.broadcast(self.to_activity(), user, queue=priority)
def delete(self, *args, broadcast=True, **kwargs): def delete(self, *args, broadcast=True, **kwargs):
"""nevermind, undo that activity""" """nevermind, undo that activity"""
@ -504,7 +505,7 @@ def unfurl_related_field(related_field, sort_field=None):
return related_field.remote_id return related_field.remote_id
@app.task(queue="medium_priority") @app.task(queue=MEDIUM)
def broadcast_task(sender_id, activity, recipients): def broadcast_task(sender_id, activity, recipients):
"""the celery task for broadcast""" """the celery task for broadcast"""
user_model = apps.get_model("bookwyrm.User", require_ready=True) user_model = apps.get_model("bookwyrm.User", require_ready=True)

View file

@ -9,3 +9,8 @@ os.environ.setdefault("DJANGO_SETTINGS_MODULE", "celerywyrm.settings")
app = Celery( app = Celery(
"tasks", broker=settings.CELERY_BROKER_URL, backend=settings.CELERY_RESULT_BACKEND "tasks", broker=settings.CELERY_BROKER_URL, backend=settings.CELERY_RESULT_BACKEND
) )
# priorities
LOW = "low_priority"
MEDIUM = "medium_priority"
HIGH = "high_priority"