mirror of
https://github.com/bookwyrm-social/bookwyrm.git
synced 2024-11-18 23:51:10 +00:00
Merge pull request #1376 from bookwyrm-social/celery-queues
Celery queues
This commit is contained in:
commit
88f28ed5da
13 changed files with 28 additions and 36 deletions
|
@ -213,7 +213,7 @@ class ActivityObject:
|
||||||
return data
|
return data
|
||||||
|
|
||||||
|
|
||||||
@app.task
|
@app.task(queue="medium_priority")
|
||||||
@transaction.atomic
|
@transaction.atomic
|
||||||
def set_related_field(
|
def set_related_field(
|
||||||
model_name, origin_model_name, related_field_name, related_remote_id, data
|
model_name, origin_model_name, related_field_name, related_remote_id, data
|
||||||
|
|
|
@ -395,7 +395,7 @@ def remove_statuses_on_unshelve(sender, instance, *args, **kwargs):
|
||||||
# ---- TASKS
|
# ---- TASKS
|
||||||
|
|
||||||
|
|
||||||
@app.task
|
@app.task(queue="low_priority")
|
||||||
def add_book_statuses_task(user_id, book_id):
|
def add_book_statuses_task(user_id, book_id):
|
||||||
"""add statuses related to a book on shelve"""
|
"""add statuses related to a book on shelve"""
|
||||||
user = models.User.objects.get(id=user_id)
|
user = models.User.objects.get(id=user_id)
|
||||||
|
@ -403,7 +403,7 @@ def add_book_statuses_task(user_id, book_id):
|
||||||
BooksStream().add_book_statuses(user, book)
|
BooksStream().add_book_statuses(user, book)
|
||||||
|
|
||||||
|
|
||||||
@app.task
|
@app.task(queue="low_priority")
|
||||||
def remove_book_statuses_task(user_id, book_id):
|
def remove_book_statuses_task(user_id, book_id):
|
||||||
"""remove statuses about a book from a user's books feed"""
|
"""remove statuses about a book from a user's books feed"""
|
||||||
user = models.User.objects.get(id=user_id)
|
user = models.User.objects.get(id=user_id)
|
||||||
|
@ -411,7 +411,7 @@ def remove_book_statuses_task(user_id, book_id):
|
||||||
BooksStream().remove_book_statuses(user, book)
|
BooksStream().remove_book_statuses(user, book)
|
||||||
|
|
||||||
|
|
||||||
@app.task
|
@app.task(queue="medium_priority")
|
||||||
def populate_stream_task(stream, user_id):
|
def populate_stream_task(stream, user_id):
|
||||||
"""background task for populating an empty activitystream"""
|
"""background task for populating an empty activitystream"""
|
||||||
user = models.User.objects.get(id=user_id)
|
user = models.User.objects.get(id=user_id)
|
||||||
|
@ -419,7 +419,7 @@ def populate_stream_task(stream, user_id):
|
||||||
stream.populate_streams(user)
|
stream.populate_streams(user)
|
||||||
|
|
||||||
|
|
||||||
@app.task
|
@app.task(queue="medium_priority")
|
||||||
def remove_status_task(status_ids):
|
def remove_status_task(status_ids):
|
||||||
"""remove a status from any stream it might be in"""
|
"""remove a status from any stream it might be in"""
|
||||||
# this can take an id or a list of ids
|
# this can take an id or a list of ids
|
||||||
|
@ -432,7 +432,7 @@ def remove_status_task(status_ids):
|
||||||
stream.remove_object_from_related_stores(status)
|
stream.remove_object_from_related_stores(status)
|
||||||
|
|
||||||
|
|
||||||
@app.task
|
@app.task(queue="medium_priority")
|
||||||
def add_status_task(status_id, increment_unread=False):
|
def add_status_task(status_id, increment_unread=False):
|
||||||
"""remove a status from any stream it might be in"""
|
"""remove a status from any stream it might be in"""
|
||||||
status = models.Status.objects.get(id=status_id)
|
status = models.Status.objects.get(id=status_id)
|
||||||
|
@ -440,7 +440,7 @@ def add_status_task(status_id, increment_unread=False):
|
||||||
stream.add_status(status, increment_unread=increment_unread)
|
stream.add_status(status, increment_unread=increment_unread)
|
||||||
|
|
||||||
|
|
||||||
@app.task
|
@app.task(queue="medium_priority")
|
||||||
def remove_user_statuses_task(viewer_id, user_id, stream_list=None):
|
def remove_user_statuses_task(viewer_id, user_id, stream_list=None):
|
||||||
"""remove all statuses by a user from a viewer's stream"""
|
"""remove all statuses by a user from a viewer's stream"""
|
||||||
stream_list = [streams[s] for s in stream_list] if stream_list else streams.values()
|
stream_list = [streams[s] for s in stream_list] if stream_list else streams.values()
|
||||||
|
@ -450,9 +450,9 @@ def remove_user_statuses_task(viewer_id, user_id, stream_list=None):
|
||||||
stream.remove_user_statuses(viewer, user)
|
stream.remove_user_statuses(viewer, user)
|
||||||
|
|
||||||
|
|
||||||
@app.task
|
@app.task(queue="medium_priority")
|
||||||
def add_user_statuses_task(viewer_id, user_id, stream_list=None):
|
def add_user_statuses_task(viewer_id, user_id, stream_list=None):
|
||||||
"""remove all statuses by a user from a viewer's stream"""
|
"""add all statuses by a user to a viewer's stream"""
|
||||||
stream_list = [streams[s] for s in stream_list] if stream_list else streams.values()
|
stream_list = [streams[s] for s in stream_list] if stream_list else streams.values()
|
||||||
viewer = models.User.objects.get(id=viewer_id)
|
viewer = models.User.objects.get(id=viewer_id)
|
||||||
user = models.User.objects.get(id=user_id)
|
user = models.User.objects.get(id=user_id)
|
||||||
|
@ -460,7 +460,7 @@ def add_user_statuses_task(viewer_id, user_id, stream_list=None):
|
||||||
stream.add_user_statuses(viewer, user)
|
stream.add_user_statuses(viewer, user)
|
||||||
|
|
||||||
|
|
||||||
@app.task
|
@app.task(queue="medium_priority")
|
||||||
def handle_boost_task(boost_id):
|
def handle_boost_task(boost_id):
|
||||||
"""remove the original post and other, earlier boosts"""
|
"""remove the original post and other, earlier boosts"""
|
||||||
instance = models.Status.objects.get(id=boost_id)
|
instance = models.Status.objects.get(id=boost_id)
|
||||||
|
|
|
@ -119,7 +119,7 @@ def get_or_create_connector(remote_id):
|
||||||
return load_connector(connector_info)
|
return load_connector(connector_info)
|
||||||
|
|
||||||
|
|
||||||
@app.task
|
@app.task(queue="low_priority")
|
||||||
def load_more_data(connector_id, book_id):
|
def load_more_data(connector_id, book_id):
|
||||||
"""background the work of getting all 10,000 editions of LoTR"""
|
"""background the work of getting all 10,000 editions of LoTR"""
|
||||||
connector_info = models.Connector.objects.get(id=connector_id)
|
connector_info = models.Connector.objects.get(id=connector_id)
|
||||||
|
|
|
@ -64,7 +64,7 @@ def format_email(email_name, data):
|
||||||
return (subject, html_content, text_content)
|
return (subject, html_content, text_content)
|
||||||
|
|
||||||
|
|
||||||
@app.task
|
@app.task(queue="high_priority")
|
||||||
def send_email(recipient, subject, html_content, text_content):
|
def send_email(recipient, subject, html_content, text_content):
|
||||||
"""use a task to send the email"""
|
"""use a task to send the email"""
|
||||||
email = EmailMultiAlternatives(
|
email = EmailMultiAlternatives(
|
||||||
|
|
|
@ -61,7 +61,7 @@ class Importer:
|
||||||
job.save()
|
job.save()
|
||||||
|
|
||||||
|
|
||||||
@app.task
|
@app.task(queue="low_priority")
|
||||||
def import_data(source, job_id):
|
def import_data(source, job_id):
|
||||||
"""does the actual lookup work in a celery task"""
|
"""does the actual lookup work in a celery task"""
|
||||||
job = ImportJob.objects.get(id=job_id)
|
job = ImportJob.objects.get(id=job_id)
|
||||||
|
|
|
@ -502,7 +502,7 @@ def unfurl_related_field(related_field, sort_field=None):
|
||||||
return related_field.remote_id
|
return related_field.remote_id
|
||||||
|
|
||||||
|
|
||||||
@app.task
|
@app.task(queue="medium_priority")
|
||||||
def broadcast_task(sender_id, activity, recipients):
|
def broadcast_task(sender_id, activity, recipients):
|
||||||
"""the celery task for broadcast"""
|
"""the celery task for broadcast"""
|
||||||
user_model = apps.get_model("bookwyrm.User", require_ready=True)
|
user_model = apps.get_model("bookwyrm.User", require_ready=True)
|
||||||
|
|
|
@ -425,7 +425,7 @@ class AnnualGoal(BookWyrmModel):
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@app.task
|
@app.task(queue="low_priority")
|
||||||
def set_remote_server(user_id):
|
def set_remote_server(user_id):
|
||||||
"""figure out the user's remote server in the background"""
|
"""figure out the user's remote server in the background"""
|
||||||
user = User.objects.get(id=user_id)
|
user = User.objects.get(id=user_id)
|
||||||
|
@ -464,7 +464,7 @@ def get_or_create_remote_server(domain):
|
||||||
return server
|
return server
|
||||||
|
|
||||||
|
|
||||||
@app.task
|
@app.task(queue="low_priority")
|
||||||
def get_remote_reviews(outbox):
|
def get_remote_reviews(outbox):
|
||||||
"""ingest reviews by a new remote bookwyrm user"""
|
"""ingest reviews by a new remote bookwyrm user"""
|
||||||
outbox_page = outbox + "?page=true&type=Review"
|
outbox_page = outbox + "?page=true&type=Review"
|
||||||
|
|
|
@ -352,7 +352,7 @@ def save_and_cleanup(image, instance=None):
|
||||||
|
|
||||||
|
|
||||||
# pylint: disable=invalid-name
|
# pylint: disable=invalid-name
|
||||||
@app.task
|
@app.task(queue="low_priority")
|
||||||
def generate_site_preview_image_task():
|
def generate_site_preview_image_task():
|
||||||
"""generate preview_image for the website"""
|
"""generate preview_image for the website"""
|
||||||
if not settings.ENABLE_PREVIEW_IMAGES:
|
if not settings.ENABLE_PREVIEW_IMAGES:
|
||||||
|
@ -377,7 +377,7 @@ def generate_site_preview_image_task():
|
||||||
|
|
||||||
|
|
||||||
# pylint: disable=invalid-name
|
# pylint: disable=invalid-name
|
||||||
@app.task
|
@app.task(queue="low_priority")
|
||||||
def generate_edition_preview_image_task(book_id):
|
def generate_edition_preview_image_task(book_id):
|
||||||
"""generate preview_image for a book"""
|
"""generate preview_image for a book"""
|
||||||
if not settings.ENABLE_PREVIEW_IMAGES:
|
if not settings.ENABLE_PREVIEW_IMAGES:
|
||||||
|
@ -402,7 +402,7 @@ def generate_edition_preview_image_task(book_id):
|
||||||
save_and_cleanup(image, instance=book)
|
save_and_cleanup(image, instance=book)
|
||||||
|
|
||||||
|
|
||||||
@app.task
|
@app.task(queue="low_priority")
|
||||||
def generate_user_preview_image_task(user_id):
|
def generate_user_preview_image_task(user_id):
|
||||||
"""generate preview_image for a book"""
|
"""generate preview_image for a book"""
|
||||||
if not settings.ENABLE_PREVIEW_IMAGES:
|
if not settings.ENABLE_PREVIEW_IMAGES:
|
||||||
|
|
|
@ -194,27 +194,27 @@ def add_new_user(sender, instance, created, update_fields=None, **kwargs):
|
||||||
remove_user_task.delay(instance.id)
|
remove_user_task.delay(instance.id)
|
||||||
|
|
||||||
|
|
||||||
@app.task
|
@app.task(queue="low_priority")
|
||||||
def rerank_suggestions_task(user_id):
|
def rerank_suggestions_task(user_id):
|
||||||
"""do the hard work in celery"""
|
"""do the hard work in celery"""
|
||||||
suggested_users.rerank_user_suggestions(user_id)
|
suggested_users.rerank_user_suggestions(user_id)
|
||||||
|
|
||||||
|
|
||||||
@app.task
|
@app.task(queue="low_priority")
|
||||||
def rerank_user_task(user_id, update_only=False):
|
def rerank_user_task(user_id, update_only=False):
|
||||||
"""do the hard work in celery"""
|
"""do the hard work in celery"""
|
||||||
user = models.User.objects.get(id=user_id)
|
user = models.User.objects.get(id=user_id)
|
||||||
suggested_users.rerank_obj(user, update_only=update_only)
|
suggested_users.rerank_obj(user, update_only=update_only)
|
||||||
|
|
||||||
|
|
||||||
@app.task
|
@app.task(queue="low_priority")
|
||||||
def remove_user_task(user_id):
|
def remove_user_task(user_id):
|
||||||
"""do the hard work in celery"""
|
"""do the hard work in celery"""
|
||||||
user = models.User.objects.get(id=user_id)
|
user = models.User.objects.get(id=user_id)
|
||||||
suggested_users.remove_object_from_related_stores(user)
|
suggested_users.remove_object_from_related_stores(user)
|
||||||
|
|
||||||
|
|
||||||
@app.task
|
@app.task(queue="medium_priority")
|
||||||
def remove_suggestion_task(user_id, suggested_user_id):
|
def remove_suggestion_task(user_id, suggested_user_id):
|
||||||
"""remove a specific user from a specific user's suggestions"""
|
"""remove a specific user from a specific user's suggestions"""
|
||||||
suggested_user = models.User.objects.get(id=suggested_user_id)
|
suggested_user = models.User.objects.get(id=suggested_user_id)
|
||||||
|
|
|
@ -93,7 +93,7 @@ def is_blocked_activity(activity_json):
|
||||||
return models.FederatedServer.is_blocked(actor)
|
return models.FederatedServer.is_blocked(actor)
|
||||||
|
|
||||||
|
|
||||||
@app.task
|
@app.task(queue="medium_priority")
|
||||||
def activity_task(activity_json):
|
def activity_task(activity_json):
|
||||||
"""do something with this json we think is legit"""
|
"""do something with this json we think is legit"""
|
||||||
# lets see if the activitypub module can make sense of this json
|
# lets see if the activitypub module can make sense of this json
|
||||||
|
|
|
@ -3,7 +3,7 @@ from __future__ import absolute_import, unicode_literals
|
||||||
import os
|
import os
|
||||||
|
|
||||||
from celery import Celery
|
from celery import Celery
|
||||||
from . import settings
|
from . import settings # pylint: disable=unused-import
|
||||||
|
|
||||||
|
|
||||||
# set the default Django settings module for the 'celery' program.
|
# set the default Django settings module for the 'celery' program.
|
||||||
|
@ -19,13 +19,3 @@ app.config_from_object("django.conf:settings", namespace="CELERY")
|
||||||
|
|
||||||
# Load task modules from all registered Django app configs.
|
# Load task modules from all registered Django app configs.
|
||||||
app.autodiscover_tasks()
|
app.autodiscover_tasks()
|
||||||
app.autodiscover_tasks(["bookwyrm"], related_name="activitypub.base_activity")
|
|
||||||
app.autodiscover_tasks(["bookwyrm"], related_name="activitystreams")
|
|
||||||
app.autodiscover_tasks(["bookwyrm"], related_name="broadcast")
|
|
||||||
app.autodiscover_tasks(["bookwyrm"], related_name="connectors.abstract_connector")
|
|
||||||
app.autodiscover_tasks(["bookwyrm"], related_name="emailing")
|
|
||||||
app.autodiscover_tasks(["bookwyrm"], related_name="goodreads_import")
|
|
||||||
app.autodiscover_tasks(["bookwyrm"], related_name="preview_images")
|
|
||||||
app.autodiscover_tasks(["bookwyrm"], related_name="models.user")
|
|
||||||
app.autodiscover_tasks(["bookwyrm"], related_name="suggested_users")
|
|
||||||
app.autodiscover_tasks(["bookwyrm"], related_name="views.inbox")
|
|
||||||
|
|
|
@ -10,6 +10,8 @@ CELERY_RESULT_BACKEND = "redis://:{}@redis_broker:{}/0".format(
|
||||||
requests.utils.quote(env("REDIS_BROKER_PASSWORD", "")), env("REDIS_BROKER_PORT")
|
requests.utils.quote(env("REDIS_BROKER_PASSWORD", "")), env("REDIS_BROKER_PORT")
|
||||||
)
|
)
|
||||||
|
|
||||||
|
CELERY_DEFAULT_QUEUE = "low_priority"
|
||||||
|
|
||||||
CELERY_ACCEPT_CONTENT = ["json"]
|
CELERY_ACCEPT_CONTENT = ["json"]
|
||||||
CELERY_TASK_SERIALIZER = "json"
|
CELERY_TASK_SERIALIZER = "json"
|
||||||
CELERY_RESULT_SERIALIZER = "json"
|
CELERY_RESULT_SERIALIZER = "json"
|
||||||
|
|
|
@ -63,7 +63,7 @@ services:
|
||||||
build: .
|
build: .
|
||||||
networks:
|
networks:
|
||||||
- main
|
- main
|
||||||
command: celery -A celerywyrm worker -l info
|
command: celery -A celerywyrm worker -l info -Q high_priority,medium_priority,low_priority
|
||||||
volumes:
|
volumes:
|
||||||
- .:/app
|
- .:/app
|
||||||
- static_volume:/app/static
|
- static_volume:/app/static
|
||||||
|
|
Loading…
Reference in a new issue