diff --git a/bookwyrm/activitystreams.py b/bookwyrm/activitystreams.py index 4896e07d9..a9ca17e24 100644 --- a/bookwyrm/activitystreams.py +++ b/bookwyrm/activitystreams.py @@ -7,7 +7,7 @@ from django.utils import timezone from bookwyrm import models from bookwyrm.redis_store import RedisStore, r -from bookwyrm.tasks import app +from bookwyrm.tasks import app, LOW, MEDIUM, HIGH class ActivityStream(RedisStore): @@ -277,7 +277,18 @@ def add_status_on_create(sender, instance, created, *args, **kwargs): def add_status_on_create_command(sender, instance, created): """runs this code only after the database commit completes""" - add_status_task.delay(instance.id, increment_unread=created) + priority=HIGH + # check if this is an old status, de-prioritize if so + # (this will happen if federation is very slow, or, more expectedly, on csv import) + one_day = 60 * 60 * 24 + if (instance.created_date - instance.published_date).seconds > one_day: + priority=LOW + + add_status_task.apply_async( + args=(instance.id,), + kwargs={"increment_unread": created}, + queue=priority, + ) if sender == models.Boost: handle_boost_task.delay(instance.id) @@ -409,7 +420,7 @@ def remove_statuses_on_unshelve(sender, instance, *args, **kwargs): # ---- TASKS -@app.task(queue="low_priority") +@app.task(queue=LOW) def add_book_statuses_task(user_id, book_id): """add statuses related to a book on shelve""" user = models.User.objects.get(id=user_id) @@ -417,7 +428,7 @@ def add_book_statuses_task(user_id, book_id): BooksStream().add_book_statuses(user, book) -@app.task(queue="low_priority") +@app.task(queue=LOW) def remove_book_statuses_task(user_id, book_id): """remove statuses about a book from a user's books feed""" user = models.User.objects.get(id=user_id) @@ -425,7 +436,7 @@ def remove_book_statuses_task(user_id, book_id): BooksStream().remove_book_statuses(user, book) -@app.task(queue="medium_priority") +@app.task(queue=MEDIUM) def populate_stream_task(stream, user_id): """background task for populating an empty activitystream""" user = models.User.objects.get(id=user_id) @@ -433,7 +444,7 @@ def populate_stream_task(stream, user_id): stream.populate_streams(user) -@app.task(queue="medium_priority") +@app.task(queue=MEDIUM) def remove_status_task(status_ids): """remove a status from any stream it might be in""" # this can take an id or a list of ids @@ -446,7 +457,7 @@ def remove_status_task(status_ids): stream.remove_object_from_related_stores(status) -@app.task(queue="high_priority") +@app.task(queue=HIGH) def add_status_task(status_id, increment_unread=False): """add a status to any stream it should be in""" status = models.Status.objects.get(id=status_id) @@ -458,7 +469,7 @@ def add_status_task(status_id, increment_unread=False): stream.add_status(status, increment_unread=increment_unread) -@app.task(queue="medium_priority") +@app.task(queue=MEDIUM) def remove_user_statuses_task(viewer_id, user_id, stream_list=None): """remove all statuses by a user from a viewer's stream""" stream_list = [streams[s] for s in stream_list] if stream_list else streams.values() @@ -468,7 +479,7 @@ def remove_user_statuses_task(viewer_id, user_id, stream_list=None): stream.remove_user_statuses(viewer, user) -@app.task(queue="medium_priority") +@app.task(queue=MEDIUM) def add_user_statuses_task(viewer_id, user_id, stream_list=None): """add all statuses by a user to a viewer's stream""" stream_list = [streams[s] for s in stream_list] if stream_list else streams.values() @@ -478,7 +489,7 @@ def add_user_statuses_task(viewer_id, user_id, stream_list=None): stream.add_user_statuses(viewer, user) -@app.task(queue="medium_priority") +@app.task(queue=MEDIUM) def handle_boost_task(boost_id): """remove the original post and other, earlier boosts""" instance = models.Status.objects.get(id=boost_id) diff --git a/bookwyrm/importers/importer.py b/bookwyrm/importers/importer.py index d7f32c88e..3e0b0c282 100644 --- a/bookwyrm/importers/importer.py +++ b/bookwyrm/importers/importer.py @@ -7,7 +7,7 @@ from django.utils.translation import gettext_lazy as _ from bookwyrm import models from bookwyrm.models import ImportJob, ImportItem -from bookwyrm.tasks import app +from bookwyrm.tasks import app, LOW logger = logging.getLogger(__name__) @@ -201,4 +201,4 @@ def handle_imported_book(source, user, item, include_reviews, privacy): privacy=privacy, ) # only broadcast this review to other bookwyrm instances - review.save(software="bookwyrm") + review.save(software="bookwyrm", priority=LOW)