Revoke tasks

This commit is contained in:
Mouse Reeve 2022-11-05 15:40:36 -07:00
parent 6792b3d7b8
commit 76fb4c9280
2 changed files with 42 additions and 18 deletions

View file

@ -0,0 +1,18 @@
# Generated by Django 3.2.15 on 2022-11-05 22:28
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
("bookwyrm", "0161_alter_importjob_status"),
]
operations = [
migrations.AddField(
model_name="importjob",
name="task_id",
field=models.CharField(blank=True, max_length=200, null=True),
),
]

View file

@ -62,6 +62,7 @@ class ImportJob(models.Model):
source = models.CharField(max_length=100)
privacy = models.CharField(max_length=255, default="public", choices=PrivacyLevels)
retry = models.BooleanField(default=False)
task_id = models.CharField(max_length=200, null=True, blank=True)
complete = models.BooleanField(default=False)
status = models.CharField(
@ -70,15 +71,17 @@ class ImportJob(models.Model):
def start_job(self):
"""Report that the job has started"""
start_import_task.delay(self.id)
task = start_import_task.delay(self.id)
self.task_id = task.id
self.status = "active"
self.save(update_fields=["status"])
self.save(update_fields=["status", "task_id"])
def complete_job(self):
"""Report that the job has completed"""
self.status = "complete"
self.complete = True
self.pending_items.update(fail_reason=_("Import stopped"))
self.save(update_fields=["status", "complete"])
def stop_job(self):
@ -86,6 +89,14 @@ class ImportJob(models.Model):
self.status = "stopped"
self.complete = True
self.save(update_fields=["status", "complete"])
self.pending_items.update(fail_reason=_("Import stopped"))
# stop starting
app.control.revoke(self.task_id, terminate=True)
tasks = self.pending_items.filter(task_id__isnull=False).values_list(
"task_id", flat=True
)
app.control.revoke(list(tasks))
@property
def pending_items(self):
@ -143,12 +154,13 @@ class ImportItem(models.Model):
def update_job(self):
"""let the job know when the items get work done"""
job = self.job
if job.complete:
return
job.updated_date = timezone.now()
job.save()
if not job.pending_items.exists() and not job.complete:
job.status = "complete"
job.complete = True
job.save(update_fields=["complete", "status"])
job.complete_job()
def resolve(self):
"""try various ways to lookup a book"""
@ -318,6 +330,10 @@ class ImportItem(models.Model):
def start_import_task(job_id):
"""trigger the child tasks for each row"""
job = ImportJob.objects.get(id=job_id)
# don't start the job if it was stopped from the UI
if job.complete:
return
# these are sub-tasks so that one big task doesn't use up all the memory in celery
for item in job.items.all():
task = import_item_task.delay(item.id)
@ -327,19 +343,6 @@ def start_import_task(job_id):
job.save()
@app.task(queue="low_priority")
def stop_import_task(job_id):
"""Wait, let's not"""
job = ImportJob.objects.get(id=job_id)
job.status = "stopped"
job.complete = True
job.save(update_fields=["status", "complete"])
for item in job.items.filter(task_id__isnull=False).values_list(
"task_id", flat=True
):
app.control.revoke(item.task_id)
@app.task(queue="low_priority")
def import_item_task(item_id):
"""resolve a row into a book"""
@ -369,6 +372,9 @@ def import_item_task(item_id):
def handle_imported_book(item):
"""process a csv and then post about it"""
job = item.job
if job.complete:
return
user = job.user
if isinstance(item.book, Work):
item.book = item.book.default_edition