Add form to remove tasks from Celery

This commit is contained in:
Wesley Aptekar-Cassels 2023-04-06 05:48:24 -04:00
parent e909cbfd4a
commit 6986fc9025
2 changed files with 79 additions and 0 deletions

View file

@ -116,6 +116,35 @@
</div>
{% endif %}
<section class="block">
<div class="content"><h2>{% trans "Clear Queues" %}</h2></div>
<div class="content notification is-warning is-flex is-align-items-start">
<span class="icon icon-warning is-size-4 pr-3" aria-hidden="true"></span>
{% trans "Clearing queues can cause serious problems including data loss! Only play with this if you really know what you're doing. You must shut down the Celery worker before you do this." %}
</div>
<form action="{% url 'settings-celery' %}" method="post">
{% csrf_token %}
<div class="columns">
<div class="column is-3">
<div class="content"><h3>{{ form.queues.label_tag }}</h3></div>
{{ form.queues }}
</div>
<div class="column is-9">
<div class="content"><h3>{{ form.tasks.label_tag }}</h3></div>
{{ form.tasks }}
</div>
</div>
<div class="buttons is-right">
<button type="submit" class="button is-danger">{% trans "Clear Queues" %}</button>
</div>
</form>
</section>
{% if errors %}
<div class="block content">
<h2>{% trans "Errors" %}</h2>

View file

@ -1,10 +1,13 @@
""" celery status """
import json
from django.contrib.auth.decorators import login_required, permission_required
from django.http import HttpResponse
from django.template.response import TemplateResponse
from django.utils.decorators import method_decorator
from django.views import View
from django.views.decorators.http import require_GET
from django import forms
import redis
from celerywyrm import settings
@ -46,14 +49,61 @@ class CeleryStatus(View):
queues = None
errors.append(err)
form = ClearCeleryForm()
data = {
"stats": stats,
"active_tasks": active_tasks,
"queues": queues,
"form": form,
"errors": errors,
}
return TemplateResponse(request, "settings/celery.html", data)
def post(self, request):
"""Submit form to clear queues"""
form = ClearCeleryForm(request.POST)
if form.is_valid():
if len(celery.control.ping()) != 0:
return HttpResponse(
"Refusing to delete tasks while Celery worker is active"
)
pipeline = r.pipeline()
for queue in form.cleaned_data["queues"]:
for task in r.lrange(queue, 0, -1):
task_json = json.loads(task)
if task_json["headers"]["task"] in form.cleaned_data["tasks"]:
pipeline.lrem(queue, 0, task)
results = pipeline.execute()
return HttpResponse(f"Deleted {sum(results)} tasks")
class ClearCeleryForm(forms.Form):
"""Form to clear queues"""
queues = forms.MultipleChoiceField(
label="Queues",
choices=[
(LOW, "Low prioirty"),
(MEDIUM, "Medium priority"),
(HIGH, "High priority"),
(IMPORTS, "Imports"),
(BROADCAST, "Broadcasts"),
],
widget=forms.CheckboxSelectMultiple,
)
tasks = forms.MultipleChoiceField(
label="Tasks", choices=[], widget=forms.CheckboxSelectMultiple
)
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
celery.loader.import_default_modules()
self.fields["tasks"].choices = sorted(
[(k, k) for k in celery.tasks.keys() if not k.startswith("celery.")]
)
@require_GET
# pylint: disable=unused-argument