Switch from priority queues to function-based queues

Fixes: #2907
This commit is contained in:
Wesley Aptekar-Cassels 2023-07-20 00:16:38 -04:00
parent 107f5b38ca
commit 3e78e398c0
21 changed files with 183 additions and 87 deletions

View file

@ -12,7 +12,7 @@ from bookwyrm import models
from bookwyrm.connectors import ConnectorException, get_data from bookwyrm.connectors import ConnectorException, get_data
from bookwyrm.signatures import make_signature from bookwyrm.signatures import make_signature
from bookwyrm.settings import DOMAIN, INSTANCE_ACTOR_USERNAME from bookwyrm.settings import DOMAIN, INSTANCE_ACTOR_USERNAME
from bookwyrm.tasks import app, MEDIUM from bookwyrm.tasks import app, MISC
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -241,7 +241,7 @@ class ActivityObject:
return data return data
@app.task(queue=MEDIUM) @app.task(queue=MISC)
@transaction.atomic @transaction.atomic
def set_related_field( def set_related_field(
model_name, origin_model_name, related_field_name, related_remote_id, data model_name, origin_model_name, related_field_name, related_remote_id, data

View file

@ -8,7 +8,7 @@ from opentelemetry import trace
from bookwyrm import models from bookwyrm import models
from bookwyrm.redis_store import RedisStore, r from bookwyrm.redis_store import RedisStore, r
from bookwyrm.tasks import app, LOW, MEDIUM, HIGH from bookwyrm.tasks import app, STREAMS, IMPORT_TRIGGERED
from bookwyrm.telemetry import open_telemetry from bookwyrm.telemetry import open_telemetry
@ -343,7 +343,7 @@ def add_status_on_create(sender, instance, created, *args, **kwargs):
def add_status_on_create_command(sender, instance, created): def add_status_on_create_command(sender, instance, created):
"""runs this code only after the database commit completes""" """runs this code only after the database commit completes"""
priority = HIGH priority = STREAMS
# check if this is an old status, de-prioritize if so # check if this is an old status, de-prioritize if so
# (this will happen if federation is very slow, or, more expectedly, on csv import) # (this will happen if federation is very slow, or, more expectedly, on csv import)
if instance.published_date < timezone.now() - timedelta( if instance.published_date < timezone.now() - timedelta(
@ -353,7 +353,7 @@ def add_status_on_create_command(sender, instance, created):
if instance.user.local: if instance.user.local:
return return
# an out of date remote status is a low priority but should be added # an out of date remote status is a low priority but should be added
priority = LOW priority = IMPORT_TRIGGERED
add_status_task.apply_async( add_status_task.apply_async(
args=(instance.id,), args=(instance.id,),
@ -497,7 +497,7 @@ def remove_statuses_on_unshelve(sender, instance, *args, **kwargs):
# ---- TASKS # ---- TASKS
@app.task(queue=LOW) @app.task(queue=STREAMS)
def add_book_statuses_task(user_id, book_id): def add_book_statuses_task(user_id, book_id):
"""add statuses related to a book on shelve""" """add statuses related to a book on shelve"""
user = models.User.objects.get(id=user_id) user = models.User.objects.get(id=user_id)
@ -505,7 +505,7 @@ def add_book_statuses_task(user_id, book_id):
BooksStream().add_book_statuses(user, book) BooksStream().add_book_statuses(user, book)
@app.task(queue=LOW) @app.task(queue=STREAMS)
def remove_book_statuses_task(user_id, book_id): def remove_book_statuses_task(user_id, book_id):
"""remove statuses about a book from a user's books feed""" """remove statuses about a book from a user's books feed"""
user = models.User.objects.get(id=user_id) user = models.User.objects.get(id=user_id)
@ -513,7 +513,7 @@ def remove_book_statuses_task(user_id, book_id):
BooksStream().remove_book_statuses(user, book) BooksStream().remove_book_statuses(user, book)
@app.task(queue=MEDIUM) @app.task(queue=STREAMS)
def populate_stream_task(stream, user_id): def populate_stream_task(stream, user_id):
"""background task for populating an empty activitystream""" """background task for populating an empty activitystream"""
user = models.User.objects.get(id=user_id) user = models.User.objects.get(id=user_id)
@ -521,7 +521,7 @@ def populate_stream_task(stream, user_id):
stream.populate_streams(user) stream.populate_streams(user)
@app.task(queue=MEDIUM) @app.task(queue=STREAMS)
def remove_status_task(status_ids): def remove_status_task(status_ids):
"""remove a status from any stream it might be in""" """remove a status from any stream it might be in"""
# this can take an id or a list of ids # this can take an id or a list of ids
@ -536,7 +536,7 @@ def remove_status_task(status_ids):
) )
@app.task(queue=HIGH) @app.task(queue=STREAMS)
def add_status_task(status_id, increment_unread=False): def add_status_task(status_id, increment_unread=False):
"""add a status to any stream it should be in""" """add a status to any stream it should be in"""
status = models.Status.objects.select_subclasses().get(id=status_id) status = models.Status.objects.select_subclasses().get(id=status_id)
@ -548,7 +548,7 @@ def add_status_task(status_id, increment_unread=False):
stream.add_status(status, increment_unread=increment_unread) stream.add_status(status, increment_unread=increment_unread)
@app.task(queue=MEDIUM) @app.task(queue=STREAMS)
def remove_user_statuses_task(viewer_id, user_id, stream_list=None): def remove_user_statuses_task(viewer_id, user_id, stream_list=None):
"""remove all statuses by a user from a viewer's stream""" """remove all statuses by a user from a viewer's stream"""
stream_list = [streams[s] for s in stream_list] if stream_list else streams.values() stream_list = [streams[s] for s in stream_list] if stream_list else streams.values()
@ -558,7 +558,7 @@ def remove_user_statuses_task(viewer_id, user_id, stream_list=None):
stream.remove_user_statuses(viewer, user) stream.remove_user_statuses(viewer, user)
@app.task(queue=MEDIUM) @app.task(queue=STREAMS)
def add_user_statuses_task(viewer_id, user_id, stream_list=None): def add_user_statuses_task(viewer_id, user_id, stream_list=None):
"""add all statuses by a user to a viewer's stream""" """add all statuses by a user to a viewer's stream"""
stream_list = [streams[s] for s in stream_list] if stream_list else streams.values() stream_list = [streams[s] for s in stream_list] if stream_list else streams.values()
@ -568,7 +568,7 @@ def add_user_statuses_task(viewer_id, user_id, stream_list=None):
stream.add_user_statuses(viewer, user) stream.add_user_statuses(viewer, user)
@app.task(queue=MEDIUM) @app.task(queue=STREAMS)
def handle_boost_task(boost_id): def handle_boost_task(boost_id):
"""remove the original post and other, earlier boosts""" """remove the original post and other, earlier boosts"""
instance = models.Status.objects.get(id=boost_id) instance = models.Status.objects.get(id=boost_id)

View file

@ -13,7 +13,7 @@ from requests import HTTPError
from bookwyrm import book_search, models from bookwyrm import book_search, models
from bookwyrm.settings import SEARCH_TIMEOUT from bookwyrm.settings import SEARCH_TIMEOUT
from bookwyrm.tasks import app, LOW from bookwyrm.tasks import app, CONNECTORS
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -109,7 +109,7 @@ def get_or_create_connector(remote_id):
return load_connector(connector_info) return load_connector(connector_info)
@app.task(queue=LOW) @app.task(queue=CONNECTORS)
def load_more_data(connector_id, book_id): def load_more_data(connector_id, book_id):
"""background the work of getting all 10,000 editions of LoTR""" """background the work of getting all 10,000 editions of LoTR"""
connector_info = models.Connector.objects.get(id=connector_id) connector_info = models.Connector.objects.get(id=connector_id)
@ -118,7 +118,7 @@ def load_more_data(connector_id, book_id):
connector.expand_book_data(book) connector.expand_book_data(book)
@app.task(queue=LOW) @app.task(queue=CONNECTORS)
def create_edition_task(connector_id, work_id, data): def create_edition_task(connector_id, work_id, data):
"""separate task for each of the 10,000 editions of LoTR""" """separate task for each of the 10,000 editions of LoTR"""
connector_info = models.Connector.objects.get(id=connector_id) connector_info = models.Connector.objects.get(id=connector_id)

View file

@ -3,7 +3,7 @@ from django.core.mail import EmailMultiAlternatives
from django.template.loader import get_template from django.template.loader import get_template
from bookwyrm import models, settings from bookwyrm import models, settings
from bookwyrm.tasks import app, HIGH from bookwyrm.tasks import app, EMAIL
from bookwyrm.settings import DOMAIN from bookwyrm.settings import DOMAIN
@ -75,7 +75,7 @@ def format_email(email_name, data):
return (subject, html_content, text_content) return (subject, html_content, text_content)
@app.task(queue=HIGH) @app.task(queue=EMAIL)
def send_email(recipient, subject, html_content, text_content): def send_email(recipient, subject, html_content, text_content):
"""use a task to send the email""" """use a task to send the email"""
email = EmailMultiAlternatives( email = EmailMultiAlternatives(

View file

@ -5,7 +5,7 @@ from django.db.models import signals, Count, Q
from bookwyrm import models from bookwyrm import models
from bookwyrm.redis_store import RedisStore from bookwyrm.redis_store import RedisStore
from bookwyrm.tasks import app, MEDIUM, HIGH from bookwyrm.tasks import app, LISTS
class ListsStream(RedisStore): class ListsStream(RedisStore):
@ -217,14 +217,14 @@ def add_list_on_account_create_command(user_id):
# ---- TASKS # ---- TASKS
@app.task(queue=MEDIUM) @app.task(queue=LISTS)
def populate_lists_task(user_id): def populate_lists_task(user_id):
"""background task for populating an empty list stream""" """background task for populating an empty list stream"""
user = models.User.objects.get(id=user_id) user = models.User.objects.get(id=user_id)
ListsStream().populate_lists(user) ListsStream().populate_lists(user)
@app.task(queue=MEDIUM) @app.task(queue=LISTS)
def remove_list_task(list_id, re_add=False): def remove_list_task(list_id, re_add=False):
"""remove a list from any stream it might be in""" """remove a list from any stream it might be in"""
stores = models.User.objects.filter(local=True, is_active=True).values_list( stores = models.User.objects.filter(local=True, is_active=True).values_list(
@ -239,14 +239,14 @@ def remove_list_task(list_id, re_add=False):
add_list_task.delay(list_id) add_list_task.delay(list_id)
@app.task(queue=HIGH) @app.task(queue=LISTS)
def add_list_task(list_id): def add_list_task(list_id):
"""add a list to any stream it should be in""" """add a list to any stream it should be in"""
book_list = models.List.objects.get(id=list_id) book_list = models.List.objects.get(id=list_id)
ListsStream().add_list(book_list) ListsStream().add_list(book_list)
@app.task(queue=MEDIUM) @app.task(queue=LISTS)
def remove_user_lists_task(viewer_id, user_id, exclude_privacy=None): def remove_user_lists_task(viewer_id, user_id, exclude_privacy=None):
"""remove all lists by a user from a viewer's stream""" """remove all lists by a user from a viewer's stream"""
viewer = models.User.objects.get(id=viewer_id) viewer = models.User.objects.get(id=viewer_id)
@ -254,7 +254,7 @@ def remove_user_lists_task(viewer_id, user_id, exclude_privacy=None):
ListsStream().remove_user_lists(viewer, user, exclude_privacy=exclude_privacy) ListsStream().remove_user_lists(viewer, user, exclude_privacy=exclude_privacy)
@app.task(queue=MEDIUM) @app.task(queue=LISTS)
def add_user_lists_task(viewer_id, user_id): def add_user_lists_task(viewer_id, user_id):
"""add all lists by a user to a viewer's stream""" """add all lists by a user to a viewer's stream"""
viewer = models.User.objects.get(id=viewer_id) viewer = models.User.objects.get(id=viewer_id)

View file

@ -21,7 +21,7 @@ from django.utils.http import http_date
from bookwyrm import activitypub from bookwyrm import activitypub
from bookwyrm.settings import USER_AGENT, PAGE_LENGTH from bookwyrm.settings import USER_AGENT, PAGE_LENGTH
from bookwyrm.signatures import make_signature, make_digest from bookwyrm.signatures import make_signature, make_digest
from bookwyrm.tasks import app, MEDIUM, BROADCAST from bookwyrm.tasks import app, BROADCAST
from bookwyrm.models.fields import ImageField, ManyToManyField from bookwyrm.models.fields import ImageField, ManyToManyField
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -379,7 +379,7 @@ class CollectionItemMixin(ActivitypubMixin):
activity_serializer = activitypub.CollectionItem activity_serializer = activitypub.CollectionItem
def broadcast(self, activity, sender, software="bookwyrm", queue=MEDIUM): def broadcast(self, activity, sender, software="bookwyrm", queue=BROADCAST):
"""only send book collection updates to other bookwyrm instances""" """only send book collection updates to other bookwyrm instances"""
super().broadcast(activity, sender, software=software, queue=queue) super().broadcast(activity, sender, software=software, queue=queue)
@ -400,7 +400,7 @@ class CollectionItemMixin(ActivitypubMixin):
return [] return []
return [collection_field.user] return [collection_field.user]
def save(self, *args, broadcast=True, priority=MEDIUM, **kwargs): def save(self, *args, broadcast=True, priority=BROADCAST, **kwargs):
"""broadcast updated""" """broadcast updated"""
# first off, we want to save normally no matter what # first off, we want to save normally no matter what
super().save(*args, **kwargs) super().save(*args, **kwargs)
@ -444,7 +444,7 @@ class CollectionItemMixin(ActivitypubMixin):
class ActivityMixin(ActivitypubMixin): class ActivityMixin(ActivitypubMixin):
"""add this mixin for models that are AP serializable""" """add this mixin for models that are AP serializable"""
def save(self, *args, broadcast=True, priority=MEDIUM, **kwargs): def save(self, *args, broadcast=True, priority=BROADCAST, **kwargs):
"""broadcast activity""" """broadcast activity"""
super().save(*args, **kwargs) super().save(*args, **kwargs)
user = self.user if hasattr(self, "user") else self.user_subject user = self.user if hasattr(self, "user") else self.user_subject

View file

@ -8,7 +8,7 @@ from django.db import models, transaction
from django.db.models import Q from django.db.models import Q
from django.utils.translation import gettext_lazy as _ from django.utils.translation import gettext_lazy as _
from bookwyrm.tasks import app, LOW from bookwyrm.tasks import app, MISC
from .base_model import BookWyrmModel from .base_model import BookWyrmModel
from .user import User from .user import User
@ -65,7 +65,7 @@ class AutoMod(AdminModel):
created_by = models.ForeignKey("User", on_delete=models.PROTECT) created_by = models.ForeignKey("User", on_delete=models.PROTECT)
@app.task(queue=LOW) @app.task(queue=MISC)
def automod_task(): def automod_task():
"""Create reports""" """Create reports"""
if not AutoMod.objects.exists(): if not AutoMod.objects.exists():

View file

@ -19,7 +19,7 @@ from bookwyrm.models import (
Review, Review,
ReviewRating, ReviewRating,
) )
from bookwyrm.tasks import app, LOW, IMPORTS from bookwyrm.tasks import app, IMPORT_TRIGGERED, IMPORTS
from .fields import PrivacyLevels from .fields import PrivacyLevels
@ -399,7 +399,7 @@ def handle_imported_book(item):
shelved_date = item.date_added or timezone.now() shelved_date = item.date_added or timezone.now()
ShelfBook( ShelfBook(
book=item.book, shelf=desired_shelf, user=user, shelved_date=shelved_date book=item.book, shelf=desired_shelf, user=user, shelved_date=shelved_date
).save(priority=LOW) ).save(priority=IMPORT_TRIGGERED)
for read in item.reads: for read in item.reads:
# check for an existing readthrough with the same dates # check for an existing readthrough with the same dates
@ -441,7 +441,7 @@ def handle_imported_book(item):
published_date=published_date_guess, published_date=published_date_guess,
privacy=job.privacy, privacy=job.privacy,
) )
review.save(software="bookwyrm", priority=LOW) review.save(software="bookwyrm", priority=IMPORT_TRIGGERED)
else: else:
# just a rating # just a rating
review = ReviewRating.objects.filter( review = ReviewRating.objects.filter(
@ -458,7 +458,7 @@ def handle_imported_book(item):
published_date=published_date_guess, published_date=published_date_guess,
privacy=job.privacy, privacy=job.privacy,
) )
review.save(software="bookwyrm", priority=LOW) review.save(software="bookwyrm", priority=IMPORT_TRIGGERED)
# only broadcast this review to other bookwyrm instances # only broadcast this review to other bookwyrm instances
item.linked_review = review item.linked_review = review

View file

@ -4,7 +4,6 @@ from django.db import models, transaction, IntegrityError
from django.db.models import Q from django.db.models import Q
from bookwyrm import activitypub from bookwyrm import activitypub
from bookwyrm.tasks import HIGH
from .activitypub_mixin import ActivitypubMixin, ActivityMixin from .activitypub_mixin import ActivitypubMixin, ActivityMixin
from .activitypub_mixin import generate_activity from .activitypub_mixin import generate_activity
from .base_model import BookWyrmModel from .base_model import BookWyrmModel
@ -142,7 +141,7 @@ class UserFollowRequest(ActivitypubMixin, UserRelationship):
# a local user is following a remote user # a local user is following a remote user
if broadcast and self.user_subject.local and not self.user_object.local: if broadcast and self.user_subject.local and not self.user_object.local:
self.broadcast(self.to_activity(), self.user_subject, queue=HIGH) self.broadcast(self.to_activity(), self.user_subject)
if self.user_object.local: if self.user_object.local:
manually_approves = self.user_object.manually_approves_followers manually_approves = self.user_object.manually_approves_followers
@ -166,7 +165,7 @@ class UserFollowRequest(ActivitypubMixin, UserRelationship):
actor=self.user_object.remote_id, actor=self.user_object.remote_id,
object=self.to_activity(), object=self.to_activity(),
).serialize() ).serialize()
self.broadcast(activity, user, queue=HIGH) self.broadcast(activity, user)
if broadcast_only: if broadcast_only:
return return
@ -187,7 +186,7 @@ class UserFollowRequest(ActivitypubMixin, UserRelationship):
actor=self.user_object.remote_id, actor=self.user_object.remote_id,
object=self.to_activity(), object=self.to_activity(),
).serialize() ).serialize()
self.broadcast(activity, self.user_object, queue=HIGH) self.broadcast(activity, self.user_object)
self.delete() self.delete()

View file

@ -7,7 +7,7 @@ from django.utils import timezone
from bookwyrm import activitypub from bookwyrm import activitypub
from bookwyrm.settings import DOMAIN from bookwyrm.settings import DOMAIN
from bookwyrm.tasks import LOW from bookwyrm.tasks import BROADCAST
from .activitypub_mixin import CollectionItemMixin, OrderedCollectionMixin from .activitypub_mixin import CollectionItemMixin, OrderedCollectionMixin
from .base_model import BookWyrmModel from .base_model import BookWyrmModel
from . import fields from . import fields
@ -40,7 +40,7 @@ class Shelf(OrderedCollectionMixin, BookWyrmModel):
activity_serializer = activitypub.Shelf activity_serializer = activitypub.Shelf
def save(self, *args, priority=LOW, **kwargs): def save(self, *args, priority=BROADCAST, **kwargs):
"""set the identifier""" """set the identifier"""
super().save(*args, priority=priority, **kwargs) super().save(*args, priority=priority, **kwargs)
if not self.identifier: if not self.identifier:
@ -100,7 +100,7 @@ class ShelfBook(CollectionItemMixin, BookWyrmModel):
activity_serializer = activitypub.ShelfItem activity_serializer = activitypub.ShelfItem
collection_field = "shelf" collection_field = "shelf"
def save(self, *args, priority=LOW, **kwargs): def save(self, *args, priority=BROADCAST, **kwargs):
if not self.user: if not self.user:
self.user = self.shelf.user self.user = self.shelf.user
if self.id and self.user.local: if self.id and self.user.local:

View file

@ -20,7 +20,7 @@ from bookwyrm.models.status import Status
from bookwyrm.preview_images import generate_user_preview_image_task from bookwyrm.preview_images import generate_user_preview_image_task
from bookwyrm.settings import DOMAIN, ENABLE_PREVIEW_IMAGES, USE_HTTPS, LANGUAGES from bookwyrm.settings import DOMAIN, ENABLE_PREVIEW_IMAGES, USE_HTTPS, LANGUAGES
from bookwyrm.signatures import create_key_pair from bookwyrm.signatures import create_key_pair
from bookwyrm.tasks import app, LOW from bookwyrm.tasks import app, MISC
from bookwyrm.utils import regex from bookwyrm.utils import regex
from .activitypub_mixin import OrderedCollectionPageMixin, ActivitypubMixin from .activitypub_mixin import OrderedCollectionPageMixin, ActivitypubMixin
from .base_model import BookWyrmModel, DeactivationReason, new_access_code from .base_model import BookWyrmModel, DeactivationReason, new_access_code
@ -469,7 +469,7 @@ class KeyPair(ActivitypubMixin, BookWyrmModel):
return super().save(*args, **kwargs) return super().save(*args, **kwargs)
@app.task(queue=LOW) @app.task(queue=MISC)
def set_remote_server(user_id, allow_external_connections=False): def set_remote_server(user_id, allow_external_connections=False):
"""figure out the user's remote server in the background""" """figure out the user's remote server in the background"""
user = User.objects.get(id=user_id) user = User.objects.get(id=user_id)
@ -528,7 +528,7 @@ def get_or_create_remote_server(
return server return server
@app.task(queue=LOW) @app.task(queue=MISC)
def get_remote_reviews(outbox): def get_remote_reviews(outbox):
"""ingest reviews by a new remote bookwyrm user""" """ingest reviews by a new remote bookwyrm user"""
outbox_page = outbox + "?page=true&type=Review" outbox_page = outbox + "?page=true&type=Review"

View file

@ -16,7 +16,7 @@ from django.core.files.storage import default_storage
from django.db.models import Avg from django.db.models import Avg
from bookwyrm import models, settings from bookwyrm import models, settings
from bookwyrm.tasks import app, LOW from bookwyrm.tasks import app, IMAGES
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -420,7 +420,7 @@ def save_and_cleanup(image, instance=None):
# pylint: disable=invalid-name # pylint: disable=invalid-name
@app.task(queue=LOW) @app.task(queue=IMAGES)
def generate_site_preview_image_task(): def generate_site_preview_image_task():
"""generate preview_image for the website""" """generate preview_image for the website"""
if not settings.ENABLE_PREVIEW_IMAGES: if not settings.ENABLE_PREVIEW_IMAGES:
@ -445,7 +445,7 @@ def generate_site_preview_image_task():
# pylint: disable=invalid-name # pylint: disable=invalid-name
@app.task(queue=LOW) @app.task(queue=IMAGES)
def generate_edition_preview_image_task(book_id): def generate_edition_preview_image_task(book_id):
"""generate preview_image for a book""" """generate preview_image for a book"""
if not settings.ENABLE_PREVIEW_IMAGES: if not settings.ENABLE_PREVIEW_IMAGES:
@ -470,7 +470,7 @@ def generate_edition_preview_image_task(book_id):
save_and_cleanup(image, instance=book) save_and_cleanup(image, instance=book)
@app.task(queue=LOW) @app.task(queue=IMAGES)
def generate_user_preview_image_task(user_id): def generate_user_preview_image_task(user_id):
"""generate preview_image for a user""" """generate preview_image for a user"""
if not settings.ENABLE_PREVIEW_IMAGES: if not settings.ENABLE_PREVIEW_IMAGES:
@ -496,7 +496,7 @@ def generate_user_preview_image_task(user_id):
save_and_cleanup(image, instance=user) save_and_cleanup(image, instance=user)
@app.task(queue=LOW) @app.task(queue=IMAGES)
def remove_user_preview_image_task(user_id): def remove_user_preview_image_task(user_id):
"""remove preview_image for a user""" """remove preview_image for a user"""
if not settings.ENABLE_PREVIEW_IMAGES: if not settings.ENABLE_PREVIEW_IMAGES:

View file

@ -8,7 +8,7 @@ from opentelemetry import trace
from bookwyrm import models from bookwyrm import models
from bookwyrm.redis_store import RedisStore, r from bookwyrm.redis_store import RedisStore, r
from bookwyrm.tasks import app, LOW, MEDIUM from bookwyrm.tasks import app, SUGGESTED_USERS
from bookwyrm.telemetry import open_telemetry from bookwyrm.telemetry import open_telemetry
@ -244,20 +244,20 @@ def domain_level_update(sender, instance, created, update_fields=None, **kwargs)
# ------------------- TASKS # ------------------- TASKS
@app.task(queue=LOW) @app.task(queue=SUGGESTED_USERS)
def rerank_suggestions_task(user_id): def rerank_suggestions_task(user_id):
"""do the hard work in celery""" """do the hard work in celery"""
suggested_users.rerank_user_suggestions(user_id) suggested_users.rerank_user_suggestions(user_id)
@app.task(queue=LOW) @app.task(queue=SUGGESTED_USERS)
def rerank_user_task(user_id, update_only=False): def rerank_user_task(user_id, update_only=False):
"""do the hard work in celery""" """do the hard work in celery"""
user = models.User.objects.get(id=user_id) user = models.User.objects.get(id=user_id)
suggested_users.rerank_obj(user, update_only=update_only) suggested_users.rerank_obj(user, update_only=update_only)
@app.task(queue=LOW) @app.task(queue=SUGGESTED_USERS)
def remove_user_task(user_id): def remove_user_task(user_id):
"""do the hard work in celery""" """do the hard work in celery"""
user = models.User.objects.get(id=user_id) user = models.User.objects.get(id=user_id)
@ -266,14 +266,14 @@ def remove_user_task(user_id):
) )
@app.task(queue=MEDIUM) @app.task(queue=SUGGESTED_USERS)
def remove_suggestion_task(user_id, suggested_user_id): def remove_suggestion_task(user_id, suggested_user_id):
"""remove a specific user from a specific user's suggestions""" """remove a specific user from a specific user's suggestions"""
suggested_user = models.User.objects.get(id=suggested_user_id) suggested_user = models.User.objects.get(id=suggested_user_id)
suggested_users.remove_suggestion(user_id, suggested_user) suggested_users.remove_suggestion(user_id, suggested_user)
@app.task(queue=LOW) @app.task(queue=SUGGESTED_USERS)
def bulk_remove_instance_task(instance_id): def bulk_remove_instance_task(instance_id):
"""remove a bunch of users from recs""" """remove a bunch of users from recs"""
for user in models.User.objects.filter(federated_server__id=instance_id): for user in models.User.objects.filter(federated_server__id=instance_id):
@ -282,7 +282,7 @@ def bulk_remove_instance_task(instance_id):
) )
@app.task(queue=LOW) @app.task(queue=SUGGESTED_USERS)
def bulk_add_instance_task(instance_id): def bulk_add_instance_task(instance_id):
"""remove a bunch of users from recs""" """remove a bunch of users from recs"""
for user in models.User.objects.filter(federated_server__id=instance_id): for user in models.User.objects.filter(federated_server__id=instance_id):

View file

@ -10,11 +10,19 @@ app = Celery(
"tasks", broker=settings.CELERY_BROKER_URL, backend=settings.CELERY_RESULT_BACKEND "tasks", broker=settings.CELERY_BROKER_URL, backend=settings.CELERY_RESULT_BACKEND
) )
# priorities # priorities - for backwards compatibility, will be removed next release
LOW = "low_priority" LOW = "low_priority"
MEDIUM = "medium_priority" MEDIUM = "medium_priority"
HIGH = "high_priority" HIGH = "high_priority"
# import items get their own queue because they're such a pain in the ass
STREAMS = "streams"
IMAGES = "images"
SUGGESTED_USERS = "suggested_users"
EMAIL = "email"
CONNECTORS = "connectors"
LISTS = "lists"
INBOX = "inbox"
IMPORTS = "imports" IMPORTS = "imports"
# I keep making more queues?? this one broadcasting out IMPORT_TRIGGERED = "import_triggered"
BROADCAST = "broadcast" BROADCAST = "broadcast"
MISC = "misc"

View file

@ -21,6 +21,76 @@
<section class="block content"> <section class="block content">
<h2>{% trans "Queues" %}</h2> <h2>{% trans "Queues" %}</h2>
<div class="columns has-text-centered is-multiline"> <div class="columns has-text-centered is-multiline">
<div class="column is-4">
<div class="notification">
<p class="header">{% trans "Streams" %}</p>
<p class="title is-5">{{ queues.streams|intcomma }}</p>
</div>
</div>
<div class="column is-4">
<div class="notification">
<p class="header">{% trans "Broadcasts" %}</p>
<p class="title is-5">{{ queues.broadcast|intcomma }}</p>
</div>
</div>
<div class="column is-4">
<div class="notification">
<p class="header">{% trans "Inbox" %}</p>
<p class="title is-5">{{ queues.inbox|intcomma }}</p>
</div>
</div>
<div class="column is-4">
<div class="notification">
<p class="header">{% trans "Imports" %}</p>
<p class="title is-5">{{ queues.imports|intcomma }}</p>
</div>
</div>
<div class="column is-4">
<div class="notification">
<p class="header">{% trans "Import triggered" %}</p>
<p class="title is-5">{{ queues.import_triggered|intcomma }}</p>
</div>
</div>
<div class="column is-4">
<div class="notification">
<p class="header">{% trans "Connectors" %}</p>
<p class="title is-5">{{ queues.connectors|intcomma }}</p>
</div>
</div>
<div class="column is-6">
<div class="notification">
<p class="header">{% trans "Images" %}</p>
<p class="title is-5">{{ queues.images|intcomma }}</p>
</div>
</div>
<div class="column is-6">
<div class="notification">
<p class="header">{% trans "Suggested Users" %}</p>
<p class="title is-5">{{ queues.suggested_users|intcomma }}</p>
</div>
</div>
<div class="column is-4">
<div class="notification">
<p class="header">{% trans "Lists" %}</p>
<p class="title is-5">{{ queues.lists|intcomma }}</p>
</div>
</div>
<div class="column is-4">
<div class="notification">
<p class="header">{% trans "Email" %}</p>
<p class="title is-5">{{ queues.email|intcomma }}</p>
</div>
</div>
<div class="column is-4">
<div class="notification">
<p class="header">{% trans "Misc" %}</p>
<p class="title is-5">{{ queues.misc|intcomma }}</p>
</div>
</div>
<div class="column is-4"> <div class="column is-4">
<div class="notification"> <div class="notification">
<p class="header">{% trans "Low priority" %}</p> <p class="header">{% trans "Low priority" %}</p>
@ -39,18 +109,6 @@
<p class="title is-5">{{ queues.high_priority|intcomma }}</p> <p class="title is-5">{{ queues.high_priority|intcomma }}</p>
</div> </div>
</div> </div>
<div class="column is-6">
<div class="notification">
<p class="header">{% trans "Imports" %}</p>
<p class="title is-5">{{ queues.imports|intcomma }}</p>
</div>
</div>
<div class="column is-6">
<div class="notification">
<p class="header">{% trans "Broadcasts" %}</p>
<p class="title is-5">{{ queues.broadcast|intcomma }}</p>
</div>
</div>
</div> </div>
</section> </section>
{% else %} {% else %}

View file

@ -64,7 +64,7 @@ class ActivitystreamsSignals(TestCase):
self.assertEqual(mock.call_count, 1) self.assertEqual(mock.call_count, 1)
args = mock.call_args[1] args = mock.call_args[1]
self.assertEqual(args["args"][0], status.id) self.assertEqual(args["args"][0], status.id)
self.assertEqual(args["queue"], "high_priority") self.assertEqual(args["queue"], "streams")
def test_add_status_on_create_created_low_priority(self, *_): def test_add_status_on_create_created_low_priority(self, *_):
"""a new statuses has entered""" """a new statuses has entered"""
@ -82,7 +82,7 @@ class ActivitystreamsSignals(TestCase):
self.assertEqual(mock.call_count, 1) self.assertEqual(mock.call_count, 1)
args = mock.call_args[1] args = mock.call_args[1]
self.assertEqual(args["args"][0], status.id) self.assertEqual(args["args"][0], status.id)
self.assertEqual(args["queue"], "low_priority") self.assertEqual(args["queue"], "import_triggered")
# published later than yesterday # published later than yesterday
status = models.Status.objects.create( status = models.Status.objects.create(
@ -97,7 +97,7 @@ class ActivitystreamsSignals(TestCase):
self.assertEqual(mock.call_count, 1) self.assertEqual(mock.call_count, 1)
args = mock.call_args[1] args = mock.call_args[1]
self.assertEqual(args["args"][0], status.id) self.assertEqual(args["args"][0], status.id)
self.assertEqual(args["queue"], "low_priority") self.assertEqual(args["queue"], "import_triggered")
def test_populate_streams_on_account_create_command(self, *_): def test_populate_streams_on_account_create_command(self, *_):
"""create streams for a user""" """create streams for a user"""

View file

@ -145,7 +145,7 @@ class GenericImporter(TestCase):
) as mock: ) as mock:
import_item_task(import_item.id) import_item_task(import_item.id)
kwargs = mock.call_args.kwargs kwargs = mock.call_args.kwargs
self.assertEqual(kwargs["queue"], "low_priority") self.assertEqual(kwargs["queue"], "import_triggered")
import_item.refresh_from_db() import_item.refresh_from_db()
def test_complete_job(self, *_): def test_complete_job(self, *_):

View file

@ -11,7 +11,23 @@ from django import forms
import redis import redis
from celerywyrm import settings from celerywyrm import settings
from bookwyrm.tasks import app as celery, LOW, MEDIUM, HIGH, IMPORTS, BROADCAST from bookwyrm.tasks import (
app as celery,
LOW,
MEDIUM,
HIGH,
STREAMS,
IMAGES,
SUGGESTED_USERS,
EMAIL,
CONNECTORS,
LISTS,
INBOX,
IMPORTS,
IMPORT_TRIGGERED,
BROADCAST,
MISC,
)
r = redis.from_url(settings.REDIS_BROKER_URL) r = redis.from_url(settings.REDIS_BROKER_URL)
@ -41,8 +57,17 @@ class CeleryStatus(View):
LOW: r.llen(LOW), LOW: r.llen(LOW),
MEDIUM: r.llen(MEDIUM), MEDIUM: r.llen(MEDIUM),
HIGH: r.llen(HIGH), HIGH: r.llen(HIGH),
STREAMS: r.llen(STREAMS),
IMAGES: r.llen(IMAGES),
SUGGESTED_USERS: r.llen(SUGGESTED_USERS),
EMAIL: r.llen(EMAIL),
CONNECTORS: r.llen(CONNECTORS),
LISTS: r.llen(LISTS),
INBOX: r.llen(INBOX),
IMPORTS: r.llen(IMPORTS), IMPORTS: r.llen(IMPORTS),
IMPORT_TRIGGERED: r.llen(IMPORT_TRIGGERED),
BROADCAST: r.llen(BROADCAST), BROADCAST: r.llen(BROADCAST),
MISC: r.llen(MISC),
} }
# pylint: disable=broad-except # pylint: disable=broad-except
except Exception as err: except Exception as err:
@ -88,8 +113,17 @@ class ClearCeleryForm(forms.Form):
(LOW, "Low prioirty"), (LOW, "Low prioirty"),
(MEDIUM, "Medium priority"), (MEDIUM, "Medium priority"),
(HIGH, "High priority"), (HIGH, "High priority"),
(STREAMS, "Streams"),
(IMAGES, "Images"),
(SUGGESTED_USERS, "Suggested users"),
(EMAIL, "Email"),
(CONNECTORS, "Connectors"),
(LISTS, "Lists"),
(INBOX, "Inbox"),
(IMPORTS, "Imports"), (IMPORTS, "Imports"),
(IMPORT_TRIGGERED, "Import triggered"),
(BROADCAST, "Broadcasts"), (BROADCAST, "Broadcasts"),
(MISC, "Misc"),
], ],
widget=forms.CheckboxSelectMultiple, widget=forms.CheckboxSelectMultiple,
) )

View file

@ -13,7 +13,7 @@ from django.views import View
from django.views.decorators.csrf import csrf_exempt from django.views.decorators.csrf import csrf_exempt
from bookwyrm import activitypub, models from bookwyrm import activitypub, models
from bookwyrm.tasks import app, MEDIUM, HIGH from bookwyrm.tasks import app, INBOX
from bookwyrm.signatures import Signature from bookwyrm.signatures import Signature
from bookwyrm.utils import regex from bookwyrm.utils import regex
@ -59,11 +59,7 @@ class Inbox(View):
return HttpResponse() return HttpResponse()
return HttpResponse(status=401) return HttpResponse(status=401)
# Make activities relating to follow/unfollow a high priority sometimes_async_activity_task(activity_json)
high = ["Follow", "Accept", "Reject", "Block", "Unblock", "Undo"]
priority = HIGH if activity_json["type"] in high else MEDIUM
sometimes_async_activity_task(activity_json, queue=priority)
return HttpResponse() return HttpResponse()
@ -101,7 +97,7 @@ def raise_is_blocked_activity(activity_json):
raise PermissionDenied() raise PermissionDenied()
def sometimes_async_activity_task(activity_json, queue=MEDIUM): def sometimes_async_activity_task(activity_json):
"""Sometimes we can effectively respond to a request without queuing a new task, """Sometimes we can effectively respond to a request without queuing a new task,
and whenever that is possible, we should do it.""" and whenever that is possible, we should do it."""
activity = activitypub.parse(activity_json) activity = activitypub.parse(activity_json)
@ -111,10 +107,10 @@ def sometimes_async_activity_task(activity_json, queue=MEDIUM):
activity.action(allow_external_connections=False) activity.action(allow_external_connections=False)
except activitypub.ActivitySerializerError: except activitypub.ActivitySerializerError:
# if that doesn't work, run it asynchronously # if that doesn't work, run it asynchronously
activity_task.apply_async(args=(activity_json,), queue=queue) activity_task.apply_async(args=(activity_json,))
@app.task(queue=MEDIUM) @app.task(queue=INBOX)
def activity_task(activity_json): def activity_task(activity_json):
"""do something with this json we think is legit""" """do something with this json we think is legit"""
# lets see if the activitypub module can make sense of this json # lets see if the activitypub module can make sense of this json

View file

@ -6,7 +6,7 @@ After=network.target postgresql.service redis.service
User=bookwyrm User=bookwyrm
Group=bookwyrm Group=bookwyrm
WorkingDirectory=/opt/bookwyrm/ WorkingDirectory=/opt/bookwyrm/
ExecStart=/opt/bookwyrm/venv/bin/celery -A celerywyrm worker -l info -Q high_priority,medium_priority,low_priority,imports,broadcast ExecStart=/opt/bookwyrm/venv/bin/celery -A celerywyrm worker -l info -Q high_priority,medium_priority,low_priority,streams,images,suggested_users,email,connectors,lists,inbox,imports,import_triggered,broadcast,misc
StandardOutput=journal StandardOutput=journal
StandardError=inherit StandardError=inherit

View file

@ -62,7 +62,8 @@ services:
build: . build: .
networks: networks:
- main - main
command: celery -A celerywyrm worker -l info -Q high_priority,medium_priority,low_priority,imports,broadcast command: celery -A celerywyrm worker -l info -Q high_priority,medium_priority,low_priority,streams,images,suggested_users,email,connectors,lists,inbox,imports,import_triggered,broadcast,misc
volumes: volumes:
- .:/app - .:/app
- static_volume:/app/static - static_volume:/app/static