diff --git a/bookwyrm/templates/settings/celery.html b/bookwyrm/templates/settings/celery.html index 65315da01..5f79dfd9d 100644 --- a/bookwyrm/templates/settings/celery.html +++ b/bookwyrm/templates/settings/celery.html @@ -116,6 +116,35 @@ {% endif %} +
+

{% trans "Clear Queues" %}

+ +
+ + {% trans "Clearing queues can cause serious problems including data loss! Only play with this if you really know what you're doing. You must shut down the Celery worker before you do this." %} +
+ +
+ {% csrf_token %} + +
+
+

{{ form.queues.label_tag }}

+ {{ form.queues }} +
+ +
+

{{ form.tasks.label_tag }}

+ {{ form.tasks }} +
+
+ +
+ +
+
+
+ {% if errors %}

{% trans "Errors" %}

diff --git a/bookwyrm/views/admin/celery_status.py b/bookwyrm/views/admin/celery_status.py index 8175ae405..6263d8654 100644 --- a/bookwyrm/views/admin/celery_status.py +++ b/bookwyrm/views/admin/celery_status.py @@ -1,10 +1,13 @@ """ celery status """ +import json + from django.contrib.auth.decorators import login_required, permission_required from django.http import HttpResponse from django.template.response import TemplateResponse from django.utils.decorators import method_decorator from django.views import View from django.views.decorators.http import require_GET +from django import forms import redis from celerywyrm import settings @@ -46,14 +49,61 @@ class CeleryStatus(View): queues = None errors.append(err) + form = ClearCeleryForm() + data = { "stats": stats, "active_tasks": active_tasks, "queues": queues, + "form": form, "errors": errors, } return TemplateResponse(request, "settings/celery.html", data) + def post(self, request): + """Submit form to clear queues""" + form = ClearCeleryForm(request.POST) + if form.is_valid(): + if len(celery.control.ping()) != 0: + return HttpResponse( + "Refusing to delete tasks while Celery worker is active" + ) + pipeline = r.pipeline() + for queue in form.cleaned_data["queues"]: + for task in r.lrange(queue, 0, -1): + task_json = json.loads(task) + if task_json["headers"]["task"] in form.cleaned_data["tasks"]: + pipeline.lrem(queue, 0, task) + results = pipeline.execute() + + return HttpResponse(f"Deleted {sum(results)} tasks") + + +class ClearCeleryForm(forms.Form): + """Form to clear queues""" + + queues = forms.MultipleChoiceField( + label="Queues", + choices=[ + (LOW, "Low prioirty"), + (MEDIUM, "Medium priority"), + (HIGH, "High priority"), + (IMPORTS, "Imports"), + (BROADCAST, "Broadcasts"), + ], + widget=forms.CheckboxSelectMultiple, + ) + tasks = forms.MultipleChoiceField( + label="Tasks", choices=[], widget=forms.CheckboxSelectMultiple + ) + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + celery.loader.import_default_modules() + self.fields["tasks"].choices = sorted( + [(k, k) for k in celery.tasks.keys() if not k.startswith("celery.")] + ) + @require_GET # pylint: disable=unused-argument