From 6986fc90251c3cf2f991760568546cafc1cab335 Mon Sep 17 00:00:00 2001 From: Wesley Aptekar-Cassels Date: Thu, 6 Apr 2023 05:48:24 -0400 Subject: [PATCH] Add form to remove tasks from Celery --- bookwyrm/templates/settings/celery.html | 29 ++++++++++++++ bookwyrm/views/admin/celery_status.py | 50 +++++++++++++++++++++++++ 2 files changed, 79 insertions(+) diff --git a/bookwyrm/templates/settings/celery.html b/bookwyrm/templates/settings/celery.html index 65315da01..5f79dfd9d 100644 --- a/bookwyrm/templates/settings/celery.html +++ b/bookwyrm/templates/settings/celery.html @@ -116,6 +116,35 @@ {% endif %} +
+

{% trans "Clear Queues" %}

+ +
+ + {% trans "Clearing queues can cause serious problems including data loss! Only play with this if you really know what you're doing. You must shut down the Celery worker before you do this." %} +
+ +
+ {% csrf_token %} + +
+
+

{{ form.queues.label_tag }}

+ {{ form.queues }} +
+ +
+

{{ form.tasks.label_tag }}

+ {{ form.tasks }} +
+
+ +
+ +
+
+
+ {% if errors %}

{% trans "Errors" %}

diff --git a/bookwyrm/views/admin/celery_status.py b/bookwyrm/views/admin/celery_status.py index e0b1fe18c..392d7c471 100644 --- a/bookwyrm/views/admin/celery_status.py +++ b/bookwyrm/views/admin/celery_status.py @@ -1,10 +1,13 @@ """ celery status """ +import json + from django.contrib.auth.decorators import login_required, permission_required from django.http import HttpResponse from django.template.response import TemplateResponse from django.utils.decorators import method_decorator from django.views import View from django.views.decorators.http import require_GET +from django import forms import redis from celerywyrm import settings @@ -46,14 +49,61 @@ class CeleryStatus(View): queues = None errors.append(err) + form = ClearCeleryForm() + data = { "stats": stats, "active_tasks": active_tasks, "queues": queues, + "form": form, "errors": errors, } return TemplateResponse(request, "settings/celery.html", data) + def post(self, request): + """Submit form to clear queues""" + form = ClearCeleryForm(request.POST) + if form.is_valid(): + if len(celery.control.ping()) != 0: + return HttpResponse( + "Refusing to delete tasks while Celery worker is active" + ) + pipeline = r.pipeline() + for queue in form.cleaned_data["queues"]: + for task in r.lrange(queue, 0, -1): + task_json = json.loads(task) + if task_json["headers"]["task"] in form.cleaned_data["tasks"]: + pipeline.lrem(queue, 0, task) + results = pipeline.execute() + + return HttpResponse(f"Deleted {sum(results)} tasks") + + +class ClearCeleryForm(forms.Form): + """Form to clear queues""" + + queues = forms.MultipleChoiceField( + label="Queues", + choices=[ + (LOW, "Low prioirty"), + (MEDIUM, "Medium priority"), + (HIGH, "High priority"), + (IMPORTS, "Imports"), + (BROADCAST, "Broadcasts"), + ], + widget=forms.CheckboxSelectMultiple, + ) + tasks = forms.MultipleChoiceField( + label="Tasks", choices=[], widget=forms.CheckboxSelectMultiple + ) + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + celery.loader.import_default_modules() + self.fields["tasks"].choices = sorted( + [(k, k) for k in celery.tasks.keys() if not k.startswith("celery.")] + ) + @require_GET # pylint: disable=unused-argument