From 3e78e398c04117d5f72dbd6eb03f949d85113a23 Mon Sep 17 00:00:00 2001 From: Wesley Aptekar-Cassels Date: Thu, 20 Jul 2023 00:16:38 -0400 Subject: [PATCH] Switch from priority queues to function-based queues Fixes: #2907 --- bookwyrm/activitypub/base_activity.py | 4 +- bookwyrm/activitystreams.py | 22 ++--- bookwyrm/connectors/connector_manager.py | 6 +- bookwyrm/emailing.py | 4 +- bookwyrm/lists_stream.py | 12 +-- bookwyrm/models/activitypub_mixin.py | 8 +- bookwyrm/models/antispam.py | 4 +- bookwyrm/models/import_job.py | 8 +- bookwyrm/models/relationship.py | 7 +- bookwyrm/models/shelf.py | 6 +- bookwyrm/models/user.py | 6 +- bookwyrm/preview_images.py | 10 +-- bookwyrm/suggested_users.py | 14 ++-- bookwyrm/tasks.py | 14 +++- bookwyrm/templates/settings/celery.html | 82 ++++++++++++++++--- .../tests/activitystreams/test_signals.py | 6 +- bookwyrm/tests/importers/test_importer.py | 2 +- bookwyrm/views/admin/celery_status.py | 36 +++++++- bookwyrm/views/inbox.py | 14 ++-- contrib/systemd/bookwyrm-worker.service | 2 +- docker-compose.yml | 3 +- 21 files changed, 183 insertions(+), 87 deletions(-) diff --git a/bookwyrm/activitypub/base_activity.py b/bookwyrm/activitypub/base_activity.py index 9b7897eba..c78b4f195 100644 --- a/bookwyrm/activitypub/base_activity.py +++ b/bookwyrm/activitypub/base_activity.py @@ -12,7 +12,7 @@ from bookwyrm import models from bookwyrm.connectors import ConnectorException, get_data from bookwyrm.signatures import make_signature from bookwyrm.settings import DOMAIN, INSTANCE_ACTOR_USERNAME -from bookwyrm.tasks import app, MEDIUM +from bookwyrm.tasks import app, MISC logger = logging.getLogger(__name__) @@ -241,7 +241,7 @@ class ActivityObject: return data -@app.task(queue=MEDIUM) +@app.task(queue=MISC) @transaction.atomic def set_related_field( model_name, origin_model_name, related_field_name, related_remote_id, data diff --git a/bookwyrm/activitystreams.py b/bookwyrm/activitystreams.py index 5d581d564..7afa69921 100644 --- a/bookwyrm/activitystreams.py +++ b/bookwyrm/activitystreams.py @@ -8,7 +8,7 @@ from opentelemetry import trace from bookwyrm import models from bookwyrm.redis_store import RedisStore, r -from bookwyrm.tasks import app, LOW, MEDIUM, HIGH +from bookwyrm.tasks import app, STREAMS, IMPORT_TRIGGERED from bookwyrm.telemetry import open_telemetry @@ -343,7 +343,7 @@ def add_status_on_create(sender, instance, created, *args, **kwargs): def add_status_on_create_command(sender, instance, created): """runs this code only after the database commit completes""" - priority = HIGH + priority = STREAMS # check if this is an old status, de-prioritize if so # (this will happen if federation is very slow, or, more expectedly, on csv import) if instance.published_date < timezone.now() - timedelta( @@ -353,7 +353,7 @@ def add_status_on_create_command(sender, instance, created): if instance.user.local: return # an out of date remote status is a low priority but should be added - priority = LOW + priority = IMPORT_TRIGGERED add_status_task.apply_async( args=(instance.id,), @@ -497,7 +497,7 @@ def remove_statuses_on_unshelve(sender, instance, *args, **kwargs): # ---- TASKS -@app.task(queue=LOW) +@app.task(queue=STREAMS) def add_book_statuses_task(user_id, book_id): """add statuses related to a book on shelve""" user = models.User.objects.get(id=user_id) @@ -505,7 +505,7 @@ def add_book_statuses_task(user_id, book_id): BooksStream().add_book_statuses(user, book) -@app.task(queue=LOW) +@app.task(queue=STREAMS) def remove_book_statuses_task(user_id, book_id): """remove statuses about a book from a user's books feed""" user = models.User.objects.get(id=user_id) @@ -513,7 +513,7 @@ def remove_book_statuses_task(user_id, book_id): BooksStream().remove_book_statuses(user, book) -@app.task(queue=MEDIUM) +@app.task(queue=STREAMS) def populate_stream_task(stream, user_id): """background task for populating an empty activitystream""" user = models.User.objects.get(id=user_id) @@ -521,7 +521,7 @@ def populate_stream_task(stream, user_id): stream.populate_streams(user) -@app.task(queue=MEDIUM) +@app.task(queue=STREAMS) def remove_status_task(status_ids): """remove a status from any stream it might be in""" # this can take an id or a list of ids @@ -536,7 +536,7 @@ def remove_status_task(status_ids): ) -@app.task(queue=HIGH) +@app.task(queue=STREAMS) def add_status_task(status_id, increment_unread=False): """add a status to any stream it should be in""" status = models.Status.objects.select_subclasses().get(id=status_id) @@ -548,7 +548,7 @@ def add_status_task(status_id, increment_unread=False): stream.add_status(status, increment_unread=increment_unread) -@app.task(queue=MEDIUM) +@app.task(queue=STREAMS) def remove_user_statuses_task(viewer_id, user_id, stream_list=None): """remove all statuses by a user from a viewer's stream""" stream_list = [streams[s] for s in stream_list] if stream_list else streams.values() @@ -558,7 +558,7 @@ def remove_user_statuses_task(viewer_id, user_id, stream_list=None): stream.remove_user_statuses(viewer, user) -@app.task(queue=MEDIUM) +@app.task(queue=STREAMS) def add_user_statuses_task(viewer_id, user_id, stream_list=None): """add all statuses by a user to a viewer's stream""" stream_list = [streams[s] for s in stream_list] if stream_list else streams.values() @@ -568,7 +568,7 @@ def add_user_statuses_task(viewer_id, user_id, stream_list=None): stream.add_user_statuses(viewer, user) -@app.task(queue=MEDIUM) +@app.task(queue=STREAMS) def handle_boost_task(boost_id): """remove the original post and other, earlier boosts""" instance = models.Status.objects.get(id=boost_id) diff --git a/bookwyrm/connectors/connector_manager.py b/bookwyrm/connectors/connector_manager.py index 7e823c0af..e32da7c00 100644 --- a/bookwyrm/connectors/connector_manager.py +++ b/bookwyrm/connectors/connector_manager.py @@ -13,7 +13,7 @@ from requests import HTTPError from bookwyrm import book_search, models from bookwyrm.settings import SEARCH_TIMEOUT -from bookwyrm.tasks import app, LOW +from bookwyrm.tasks import app, CONNECTORS logger = logging.getLogger(__name__) @@ -109,7 +109,7 @@ def get_or_create_connector(remote_id): return load_connector(connector_info) -@app.task(queue=LOW) +@app.task(queue=CONNECTORS) def load_more_data(connector_id, book_id): """background the work of getting all 10,000 editions of LoTR""" connector_info = models.Connector.objects.get(id=connector_id) @@ -118,7 +118,7 @@ def load_more_data(connector_id, book_id): connector.expand_book_data(book) -@app.task(queue=LOW) +@app.task(queue=CONNECTORS) def create_edition_task(connector_id, work_id, data): """separate task for each of the 10,000 editions of LoTR""" connector_info = models.Connector.objects.get(id=connector_id) diff --git a/bookwyrm/emailing.py b/bookwyrm/emailing.py index 2271077b1..5e08ebba1 100644 --- a/bookwyrm/emailing.py +++ b/bookwyrm/emailing.py @@ -3,7 +3,7 @@ from django.core.mail import EmailMultiAlternatives from django.template.loader import get_template from bookwyrm import models, settings -from bookwyrm.tasks import app, HIGH +from bookwyrm.tasks import app, EMAIL from bookwyrm.settings import DOMAIN @@ -75,7 +75,7 @@ def format_email(email_name, data): return (subject, html_content, text_content) -@app.task(queue=HIGH) +@app.task(queue=EMAIL) def send_email(recipient, subject, html_content, text_content): """use a task to send the email""" email = EmailMultiAlternatives( diff --git a/bookwyrm/lists_stream.py b/bookwyrm/lists_stream.py index 2b08010b1..148b81a78 100644 --- a/bookwyrm/lists_stream.py +++ b/bookwyrm/lists_stream.py @@ -5,7 +5,7 @@ from django.db.models import signals, Count, Q from bookwyrm import models from bookwyrm.redis_store import RedisStore -from bookwyrm.tasks import app, MEDIUM, HIGH +from bookwyrm.tasks import app, LISTS class ListsStream(RedisStore): @@ -217,14 +217,14 @@ def add_list_on_account_create_command(user_id): # ---- TASKS -@app.task(queue=MEDIUM) +@app.task(queue=LISTS) def populate_lists_task(user_id): """background task for populating an empty list stream""" user = models.User.objects.get(id=user_id) ListsStream().populate_lists(user) -@app.task(queue=MEDIUM) +@app.task(queue=LISTS) def remove_list_task(list_id, re_add=False): """remove a list from any stream it might be in""" stores = models.User.objects.filter(local=True, is_active=True).values_list( @@ -239,14 +239,14 @@ def remove_list_task(list_id, re_add=False): add_list_task.delay(list_id) -@app.task(queue=HIGH) +@app.task(queue=LISTS) def add_list_task(list_id): """add a list to any stream it should be in""" book_list = models.List.objects.get(id=list_id) ListsStream().add_list(book_list) -@app.task(queue=MEDIUM) +@app.task(queue=LISTS) def remove_user_lists_task(viewer_id, user_id, exclude_privacy=None): """remove all lists by a user from a viewer's stream""" viewer = models.User.objects.get(id=viewer_id) @@ -254,7 +254,7 @@ def remove_user_lists_task(viewer_id, user_id, exclude_privacy=None): ListsStream().remove_user_lists(viewer, user, exclude_privacy=exclude_privacy) -@app.task(queue=MEDIUM) +@app.task(queue=LISTS) def add_user_lists_task(viewer_id, user_id): """add all lists by a user to a viewer's stream""" viewer = models.User.objects.get(id=viewer_id) diff --git a/bookwyrm/models/activitypub_mixin.py b/bookwyrm/models/activitypub_mixin.py index d1ca3747a..4b53c6e87 100644 --- a/bookwyrm/models/activitypub_mixin.py +++ b/bookwyrm/models/activitypub_mixin.py @@ -21,7 +21,7 @@ from django.utils.http import http_date from bookwyrm import activitypub from bookwyrm.settings import USER_AGENT, PAGE_LENGTH from bookwyrm.signatures import make_signature, make_digest -from bookwyrm.tasks import app, MEDIUM, BROADCAST +from bookwyrm.tasks import app, BROADCAST from bookwyrm.models.fields import ImageField, ManyToManyField logger = logging.getLogger(__name__) @@ -379,7 +379,7 @@ class CollectionItemMixin(ActivitypubMixin): activity_serializer = activitypub.CollectionItem - def broadcast(self, activity, sender, software="bookwyrm", queue=MEDIUM): + def broadcast(self, activity, sender, software="bookwyrm", queue=BROADCAST): """only send book collection updates to other bookwyrm instances""" super().broadcast(activity, sender, software=software, queue=queue) @@ -400,7 +400,7 @@ class CollectionItemMixin(ActivitypubMixin): return [] return [collection_field.user] - def save(self, *args, broadcast=True, priority=MEDIUM, **kwargs): + def save(self, *args, broadcast=True, priority=BROADCAST, **kwargs): """broadcast updated""" # first off, we want to save normally no matter what super().save(*args, **kwargs) @@ -444,7 +444,7 @@ class CollectionItemMixin(ActivitypubMixin): class ActivityMixin(ActivitypubMixin): """add this mixin for models that are AP serializable""" - def save(self, *args, broadcast=True, priority=MEDIUM, **kwargs): + def save(self, *args, broadcast=True, priority=BROADCAST, **kwargs): """broadcast activity""" super().save(*args, **kwargs) user = self.user if hasattr(self, "user") else self.user_subject diff --git a/bookwyrm/models/antispam.py b/bookwyrm/models/antispam.py index 1e20df340..94d978ec4 100644 --- a/bookwyrm/models/antispam.py +++ b/bookwyrm/models/antispam.py @@ -8,7 +8,7 @@ from django.db import models, transaction from django.db.models import Q from django.utils.translation import gettext_lazy as _ -from bookwyrm.tasks import app, LOW +from bookwyrm.tasks import app, MISC from .base_model import BookWyrmModel from .user import User @@ -65,7 +65,7 @@ class AutoMod(AdminModel): created_by = models.ForeignKey("User", on_delete=models.PROTECT) -@app.task(queue=LOW) +@app.task(queue=MISC) def automod_task(): """Create reports""" if not AutoMod.objects.exists(): diff --git a/bookwyrm/models/import_job.py b/bookwyrm/models/import_job.py index a489edb7c..bb5144297 100644 --- a/bookwyrm/models/import_job.py +++ b/bookwyrm/models/import_job.py @@ -19,7 +19,7 @@ from bookwyrm.models import ( Review, ReviewRating, ) -from bookwyrm.tasks import app, LOW, IMPORTS +from bookwyrm.tasks import app, IMPORT_TRIGGERED, IMPORTS from .fields import PrivacyLevels @@ -399,7 +399,7 @@ def handle_imported_book(item): shelved_date = item.date_added or timezone.now() ShelfBook( book=item.book, shelf=desired_shelf, user=user, shelved_date=shelved_date - ).save(priority=LOW) + ).save(priority=IMPORT_TRIGGERED) for read in item.reads: # check for an existing readthrough with the same dates @@ -441,7 +441,7 @@ def handle_imported_book(item): published_date=published_date_guess, privacy=job.privacy, ) - review.save(software="bookwyrm", priority=LOW) + review.save(software="bookwyrm", priority=IMPORT_TRIGGERED) else: # just a rating review = ReviewRating.objects.filter( @@ -458,7 +458,7 @@ def handle_imported_book(item): published_date=published_date_guess, privacy=job.privacy, ) - review.save(software="bookwyrm", priority=LOW) + review.save(software="bookwyrm", priority=IMPORT_TRIGGERED) # only broadcast this review to other bookwyrm instances item.linked_review = review diff --git a/bookwyrm/models/relationship.py b/bookwyrm/models/relationship.py index 4754bea36..7af6ad5ab 100644 --- a/bookwyrm/models/relationship.py +++ b/bookwyrm/models/relationship.py @@ -4,7 +4,6 @@ from django.db import models, transaction, IntegrityError from django.db.models import Q from bookwyrm import activitypub -from bookwyrm.tasks import HIGH from .activitypub_mixin import ActivitypubMixin, ActivityMixin from .activitypub_mixin import generate_activity from .base_model import BookWyrmModel @@ -142,7 +141,7 @@ class UserFollowRequest(ActivitypubMixin, UserRelationship): # a local user is following a remote user if broadcast and self.user_subject.local and not self.user_object.local: - self.broadcast(self.to_activity(), self.user_subject, queue=HIGH) + self.broadcast(self.to_activity(), self.user_subject) if self.user_object.local: manually_approves = self.user_object.manually_approves_followers @@ -166,7 +165,7 @@ class UserFollowRequest(ActivitypubMixin, UserRelationship): actor=self.user_object.remote_id, object=self.to_activity(), ).serialize() - self.broadcast(activity, user, queue=HIGH) + self.broadcast(activity, user) if broadcast_only: return @@ -187,7 +186,7 @@ class UserFollowRequest(ActivitypubMixin, UserRelationship): actor=self.user_object.remote_id, object=self.to_activity(), ).serialize() - self.broadcast(activity, self.user_object, queue=HIGH) + self.broadcast(activity, self.user_object) self.delete() diff --git a/bookwyrm/models/shelf.py b/bookwyrm/models/shelf.py index c52cb6ab8..3d92f8d43 100644 --- a/bookwyrm/models/shelf.py +++ b/bookwyrm/models/shelf.py @@ -7,7 +7,7 @@ from django.utils import timezone from bookwyrm import activitypub from bookwyrm.settings import DOMAIN -from bookwyrm.tasks import LOW +from bookwyrm.tasks import BROADCAST from .activitypub_mixin import CollectionItemMixin, OrderedCollectionMixin from .base_model import BookWyrmModel from . import fields @@ -40,7 +40,7 @@ class Shelf(OrderedCollectionMixin, BookWyrmModel): activity_serializer = activitypub.Shelf - def save(self, *args, priority=LOW, **kwargs): + def save(self, *args, priority=BROADCAST, **kwargs): """set the identifier""" super().save(*args, priority=priority, **kwargs) if not self.identifier: @@ -100,7 +100,7 @@ class ShelfBook(CollectionItemMixin, BookWyrmModel): activity_serializer = activitypub.ShelfItem collection_field = "shelf" - def save(self, *args, priority=LOW, **kwargs): + def save(self, *args, priority=BROADCAST, **kwargs): if not self.user: self.user = self.shelf.user if self.id and self.user.local: diff --git a/bookwyrm/models/user.py b/bookwyrm/models/user.py index f39468246..9253aa109 100644 --- a/bookwyrm/models/user.py +++ b/bookwyrm/models/user.py @@ -20,7 +20,7 @@ from bookwyrm.models.status import Status from bookwyrm.preview_images import generate_user_preview_image_task from bookwyrm.settings import DOMAIN, ENABLE_PREVIEW_IMAGES, USE_HTTPS, LANGUAGES from bookwyrm.signatures import create_key_pair -from bookwyrm.tasks import app, LOW +from bookwyrm.tasks import app, MISC from bookwyrm.utils import regex from .activitypub_mixin import OrderedCollectionPageMixin, ActivitypubMixin from .base_model import BookWyrmModel, DeactivationReason, new_access_code @@ -469,7 +469,7 @@ class KeyPair(ActivitypubMixin, BookWyrmModel): return super().save(*args, **kwargs) -@app.task(queue=LOW) +@app.task(queue=MISC) def set_remote_server(user_id, allow_external_connections=False): """figure out the user's remote server in the background""" user = User.objects.get(id=user_id) @@ -528,7 +528,7 @@ def get_or_create_remote_server( return server -@app.task(queue=LOW) +@app.task(queue=MISC) def get_remote_reviews(outbox): """ingest reviews by a new remote bookwyrm user""" outbox_page = outbox + "?page=true&type=Review" diff --git a/bookwyrm/preview_images.py b/bookwyrm/preview_images.py index 549e12472..aba372abc 100644 --- a/bookwyrm/preview_images.py +++ b/bookwyrm/preview_images.py @@ -16,7 +16,7 @@ from django.core.files.storage import default_storage from django.db.models import Avg from bookwyrm import models, settings -from bookwyrm.tasks import app, LOW +from bookwyrm.tasks import app, IMAGES logger = logging.getLogger(__name__) @@ -420,7 +420,7 @@ def save_and_cleanup(image, instance=None): # pylint: disable=invalid-name -@app.task(queue=LOW) +@app.task(queue=IMAGES) def generate_site_preview_image_task(): """generate preview_image for the website""" if not settings.ENABLE_PREVIEW_IMAGES: @@ -445,7 +445,7 @@ def generate_site_preview_image_task(): # pylint: disable=invalid-name -@app.task(queue=LOW) +@app.task(queue=IMAGES) def generate_edition_preview_image_task(book_id): """generate preview_image for a book""" if not settings.ENABLE_PREVIEW_IMAGES: @@ -470,7 +470,7 @@ def generate_edition_preview_image_task(book_id): save_and_cleanup(image, instance=book) -@app.task(queue=LOW) +@app.task(queue=IMAGES) def generate_user_preview_image_task(user_id): """generate preview_image for a user""" if not settings.ENABLE_PREVIEW_IMAGES: @@ -496,7 +496,7 @@ def generate_user_preview_image_task(user_id): save_and_cleanup(image, instance=user) -@app.task(queue=LOW) +@app.task(queue=IMAGES) def remove_user_preview_image_task(user_id): """remove preview_image for a user""" if not settings.ENABLE_PREVIEW_IMAGES: diff --git a/bookwyrm/suggested_users.py b/bookwyrm/suggested_users.py index 05e05891c..d897feff7 100644 --- a/bookwyrm/suggested_users.py +++ b/bookwyrm/suggested_users.py @@ -8,7 +8,7 @@ from opentelemetry import trace from bookwyrm import models from bookwyrm.redis_store import RedisStore, r -from bookwyrm.tasks import app, LOW, MEDIUM +from bookwyrm.tasks import app, SUGGESTED_USERS from bookwyrm.telemetry import open_telemetry @@ -244,20 +244,20 @@ def domain_level_update(sender, instance, created, update_fields=None, **kwargs) # ------------------- TASKS -@app.task(queue=LOW) +@app.task(queue=SUGGESTED_USERS) def rerank_suggestions_task(user_id): """do the hard work in celery""" suggested_users.rerank_user_suggestions(user_id) -@app.task(queue=LOW) +@app.task(queue=SUGGESTED_USERS) def rerank_user_task(user_id, update_only=False): """do the hard work in celery""" user = models.User.objects.get(id=user_id) suggested_users.rerank_obj(user, update_only=update_only) -@app.task(queue=LOW) +@app.task(queue=SUGGESTED_USERS) def remove_user_task(user_id): """do the hard work in celery""" user = models.User.objects.get(id=user_id) @@ -266,14 +266,14 @@ def remove_user_task(user_id): ) -@app.task(queue=MEDIUM) +@app.task(queue=SUGGESTED_USERS) def remove_suggestion_task(user_id, suggested_user_id): """remove a specific user from a specific user's suggestions""" suggested_user = models.User.objects.get(id=suggested_user_id) suggested_users.remove_suggestion(user_id, suggested_user) -@app.task(queue=LOW) +@app.task(queue=SUGGESTED_USERS) def bulk_remove_instance_task(instance_id): """remove a bunch of users from recs""" for user in models.User.objects.filter(federated_server__id=instance_id): @@ -282,7 +282,7 @@ def bulk_remove_instance_task(instance_id): ) -@app.task(queue=LOW) +@app.task(queue=SUGGESTED_USERS) def bulk_add_instance_task(instance_id): """remove a bunch of users from recs""" for user in models.User.objects.filter(federated_server__id=instance_id): diff --git a/bookwyrm/tasks.py b/bookwyrm/tasks.py index 91977afda..79e1b6340 100644 --- a/bookwyrm/tasks.py +++ b/bookwyrm/tasks.py @@ -10,11 +10,19 @@ app = Celery( "tasks", broker=settings.CELERY_BROKER_URL, backend=settings.CELERY_RESULT_BACKEND ) -# priorities +# priorities - for backwards compatibility, will be removed next release LOW = "low_priority" MEDIUM = "medium_priority" HIGH = "high_priority" -# import items get their own queue because they're such a pain in the ass + +STREAMS = "streams" +IMAGES = "images" +SUGGESTED_USERS = "suggested_users" +EMAIL = "email" +CONNECTORS = "connectors" +LISTS = "lists" +INBOX = "inbox" IMPORTS = "imports" -# I keep making more queues?? this one broadcasting out +IMPORT_TRIGGERED = "import_triggered" BROADCAST = "broadcast" +MISC = "misc" diff --git a/bookwyrm/templates/settings/celery.html b/bookwyrm/templates/settings/celery.html index 5f79dfd9d..2f4a36ce9 100644 --- a/bookwyrm/templates/settings/celery.html +++ b/bookwyrm/templates/settings/celery.html @@ -21,6 +21,76 @@

{% trans "Queues" %}

+
+
+

{% trans "Streams" %}

+

{{ queues.streams|intcomma }}

+
+
+
+
+

{% trans "Broadcasts" %}

+

{{ queues.broadcast|intcomma }}

+
+
+
+
+

{% trans "Inbox" %}

+

{{ queues.inbox|intcomma }}

+
+
+ +
+
+

{% trans "Imports" %}

+

{{ queues.imports|intcomma }}

+
+
+
+
+

{% trans "Import triggered" %}

+

{{ queues.import_triggered|intcomma }}

+
+
+
+
+

{% trans "Connectors" %}

+

{{ queues.connectors|intcomma }}

+
+
+ +
+
+

{% trans "Images" %}

+

{{ queues.images|intcomma }}

+
+
+
+
+

{% trans "Suggested Users" %}

+

{{ queues.suggested_users|intcomma }}

+
+
+ +
+
+

{% trans "Lists" %}

+

{{ queues.lists|intcomma }}

+
+
+
+
+

{% trans "Email" %}

+

{{ queues.email|intcomma }}

+
+
+
+
+

{% trans "Misc" %}

+

{{ queues.misc|intcomma }}

+
+
+

{% trans "Low priority" %}

@@ -39,18 +109,6 @@

{{ queues.high_priority|intcomma }}

-
-
-

{% trans "Imports" %}

-

{{ queues.imports|intcomma }}

-
-
-
-
-

{% trans "Broadcasts" %}

-

{{ queues.broadcast|intcomma }}

-
-
{% else %} diff --git a/bookwyrm/tests/activitystreams/test_signals.py b/bookwyrm/tests/activitystreams/test_signals.py index f7c6c20bb..210d4d5df 100644 --- a/bookwyrm/tests/activitystreams/test_signals.py +++ b/bookwyrm/tests/activitystreams/test_signals.py @@ -64,7 +64,7 @@ class ActivitystreamsSignals(TestCase): self.assertEqual(mock.call_count, 1) args = mock.call_args[1] self.assertEqual(args["args"][0], status.id) - self.assertEqual(args["queue"], "high_priority") + self.assertEqual(args["queue"], "streams") def test_add_status_on_create_created_low_priority(self, *_): """a new statuses has entered""" @@ -82,7 +82,7 @@ class ActivitystreamsSignals(TestCase): self.assertEqual(mock.call_count, 1) args = mock.call_args[1] self.assertEqual(args["args"][0], status.id) - self.assertEqual(args["queue"], "low_priority") + self.assertEqual(args["queue"], "import_triggered") # published later than yesterday status = models.Status.objects.create( @@ -97,7 +97,7 @@ class ActivitystreamsSignals(TestCase): self.assertEqual(mock.call_count, 1) args = mock.call_args[1] self.assertEqual(args["args"][0], status.id) - self.assertEqual(args["queue"], "low_priority") + self.assertEqual(args["queue"], "import_triggered") def test_populate_streams_on_account_create_command(self, *_): """create streams for a user""" diff --git a/bookwyrm/tests/importers/test_importer.py b/bookwyrm/tests/importers/test_importer.py index 51346f1a1..f48b97993 100644 --- a/bookwyrm/tests/importers/test_importer.py +++ b/bookwyrm/tests/importers/test_importer.py @@ -145,7 +145,7 @@ class GenericImporter(TestCase): ) as mock: import_item_task(import_item.id) kwargs = mock.call_args.kwargs - self.assertEqual(kwargs["queue"], "low_priority") + self.assertEqual(kwargs["queue"], "import_triggered") import_item.refresh_from_db() def test_complete_job(self, *_): diff --git a/bookwyrm/views/admin/celery_status.py b/bookwyrm/views/admin/celery_status.py index 392d7c471..cd8b85b6d 100644 --- a/bookwyrm/views/admin/celery_status.py +++ b/bookwyrm/views/admin/celery_status.py @@ -11,7 +11,23 @@ from django import forms import redis from celerywyrm import settings -from bookwyrm.tasks import app as celery, LOW, MEDIUM, HIGH, IMPORTS, BROADCAST +from bookwyrm.tasks import ( + app as celery, + LOW, + MEDIUM, + HIGH, + STREAMS, + IMAGES, + SUGGESTED_USERS, + EMAIL, + CONNECTORS, + LISTS, + INBOX, + IMPORTS, + IMPORT_TRIGGERED, + BROADCAST, + MISC, +) r = redis.from_url(settings.REDIS_BROKER_URL) @@ -41,8 +57,17 @@ class CeleryStatus(View): LOW: r.llen(LOW), MEDIUM: r.llen(MEDIUM), HIGH: r.llen(HIGH), + STREAMS: r.llen(STREAMS), + IMAGES: r.llen(IMAGES), + SUGGESTED_USERS: r.llen(SUGGESTED_USERS), + EMAIL: r.llen(EMAIL), + CONNECTORS: r.llen(CONNECTORS), + LISTS: r.llen(LISTS), + INBOX: r.llen(INBOX), IMPORTS: r.llen(IMPORTS), + IMPORT_TRIGGERED: r.llen(IMPORT_TRIGGERED), BROADCAST: r.llen(BROADCAST), + MISC: r.llen(MISC), } # pylint: disable=broad-except except Exception as err: @@ -88,8 +113,17 @@ class ClearCeleryForm(forms.Form): (LOW, "Low prioirty"), (MEDIUM, "Medium priority"), (HIGH, "High priority"), + (STREAMS, "Streams"), + (IMAGES, "Images"), + (SUGGESTED_USERS, "Suggested users"), + (EMAIL, "Email"), + (CONNECTORS, "Connectors"), + (LISTS, "Lists"), + (INBOX, "Inbox"), (IMPORTS, "Imports"), + (IMPORT_TRIGGERED, "Import triggered"), (BROADCAST, "Broadcasts"), + (MISC, "Misc"), ], widget=forms.CheckboxSelectMultiple, ) diff --git a/bookwyrm/views/inbox.py b/bookwyrm/views/inbox.py index 52d230524..4b95469d9 100644 --- a/bookwyrm/views/inbox.py +++ b/bookwyrm/views/inbox.py @@ -13,7 +13,7 @@ from django.views import View from django.views.decorators.csrf import csrf_exempt from bookwyrm import activitypub, models -from bookwyrm.tasks import app, MEDIUM, HIGH +from bookwyrm.tasks import app, INBOX from bookwyrm.signatures import Signature from bookwyrm.utils import regex @@ -59,11 +59,7 @@ class Inbox(View): return HttpResponse() return HttpResponse(status=401) - # Make activities relating to follow/unfollow a high priority - high = ["Follow", "Accept", "Reject", "Block", "Unblock", "Undo"] - - priority = HIGH if activity_json["type"] in high else MEDIUM - sometimes_async_activity_task(activity_json, queue=priority) + sometimes_async_activity_task(activity_json) return HttpResponse() @@ -101,7 +97,7 @@ def raise_is_blocked_activity(activity_json): raise PermissionDenied() -def sometimes_async_activity_task(activity_json, queue=MEDIUM): +def sometimes_async_activity_task(activity_json): """Sometimes we can effectively respond to a request without queuing a new task, and whenever that is possible, we should do it.""" activity = activitypub.parse(activity_json) @@ -111,10 +107,10 @@ def sometimes_async_activity_task(activity_json, queue=MEDIUM): activity.action(allow_external_connections=False) except activitypub.ActivitySerializerError: # if that doesn't work, run it asynchronously - activity_task.apply_async(args=(activity_json,), queue=queue) + activity_task.apply_async(args=(activity_json,)) -@app.task(queue=MEDIUM) +@app.task(queue=INBOX) def activity_task(activity_json): """do something with this json we think is legit""" # lets see if the activitypub module can make sense of this json diff --git a/contrib/systemd/bookwyrm-worker.service b/contrib/systemd/bookwyrm-worker.service index 63be04584..ebba8b6ca 100644 --- a/contrib/systemd/bookwyrm-worker.service +++ b/contrib/systemd/bookwyrm-worker.service @@ -6,7 +6,7 @@ After=network.target postgresql.service redis.service User=bookwyrm Group=bookwyrm WorkingDirectory=/opt/bookwyrm/ -ExecStart=/opt/bookwyrm/venv/bin/celery -A celerywyrm worker -l info -Q high_priority,medium_priority,low_priority,imports,broadcast +ExecStart=/opt/bookwyrm/venv/bin/celery -A celerywyrm worker -l info -Q high_priority,medium_priority,low_priority,streams,images,suggested_users,email,connectors,lists,inbox,imports,import_triggered,broadcast,misc StandardOutput=journal StandardError=inherit diff --git a/docker-compose.yml b/docker-compose.yml index 4389c08aa..800690206 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -62,7 +62,8 @@ services: build: . networks: - main - command: celery -A celerywyrm worker -l info -Q high_priority,medium_priority,low_priority,imports,broadcast + command: celery -A celerywyrm worker -l info -Q high_priority,medium_priority,low_priority,streams,images,suggested_users,email,connectors,lists,inbox,imports,import_triggered,broadcast,misc + volumes: - .:/app - static_volume:/app/static