diff --git a/bookwyrm/migrations/0162_importjob_task_id.py b/bookwyrm/migrations/0162_importjob_task_id.py new file mode 100644 index 000000000..0bc7cc8de --- /dev/null +++ b/bookwyrm/migrations/0162_importjob_task_id.py @@ -0,0 +1,18 @@ +# Generated by Django 3.2.15 on 2022-11-05 22:28 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ("bookwyrm", "0161_alter_importjob_status"), + ] + + operations = [ + migrations.AddField( + model_name="importjob", + name="task_id", + field=models.CharField(blank=True, max_length=200, null=True), + ), + ] diff --git a/bookwyrm/models/import_job.py b/bookwyrm/models/import_job.py index f648c70ab..85735fe97 100644 --- a/bookwyrm/models/import_job.py +++ b/bookwyrm/models/import_job.py @@ -62,6 +62,7 @@ class ImportJob(models.Model): source = models.CharField(max_length=100) privacy = models.CharField(max_length=255, default="public", choices=PrivacyLevels) retry = models.BooleanField(default=False) + task_id = models.CharField(max_length=200, null=True, blank=True) complete = models.BooleanField(default=False) status = models.CharField( @@ -70,15 +71,17 @@ class ImportJob(models.Model): def start_job(self): """Report that the job has started""" - start_import_task.delay(self.id) + task = start_import_task.delay(self.id) + self.task_id = task.id self.status = "active" - self.save(update_fields=["status"]) + self.save(update_fields=["status", "task_id"]) def complete_job(self): """Report that the job has completed""" self.status = "complete" self.complete = True + self.pending_items.update(fail_reason=_("Import stopped")) self.save(update_fields=["status", "complete"]) def stop_job(self): @@ -86,6 +89,14 @@ class ImportJob(models.Model): self.status = "stopped" self.complete = True self.save(update_fields=["status", "complete"]) + self.pending_items.update(fail_reason=_("Import stopped")) + + # stop starting + app.control.revoke(self.task_id, terminate=True) + tasks = self.pending_items.filter(task_id__isnull=False).values_list( + "task_id", flat=True + ) + app.control.revoke(list(tasks)) @property def pending_items(self): @@ -143,12 +154,13 @@ class ImportItem(models.Model): def update_job(self): """let the job know when the items get work done""" job = self.job + if job.complete: + return + job.updated_date = timezone.now() job.save() if not job.pending_items.exists() and not job.complete: - job.status = "complete" - job.complete = True - job.save(update_fields=["complete", "status"]) + job.complete_job() def resolve(self): """try various ways to lookup a book""" @@ -318,6 +330,10 @@ class ImportItem(models.Model): def start_import_task(job_id): """trigger the child tasks for each row""" job = ImportJob.objects.get(id=job_id) + # don't start the job if it was stopped from the UI + if job.complete: + return + # these are sub-tasks so that one big task doesn't use up all the memory in celery for item in job.items.all(): task = import_item_task.delay(item.id) @@ -327,19 +343,6 @@ def start_import_task(job_id): job.save() -@app.task(queue="low_priority") -def stop_import_task(job_id): - """Wait, let's not""" - job = ImportJob.objects.get(id=job_id) - job.status = "stopped" - job.complete = True - job.save(update_fields=["status", "complete"]) - for item in job.items.filter(task_id__isnull=False).values_list( - "task_id", flat=True - ): - app.control.revoke(item.task_id) - - @app.task(queue="low_priority") def import_item_task(item_id): """resolve a row into a book""" @@ -369,6 +372,9 @@ def import_item_task(item_id): def handle_imported_book(item): """process a csv and then post about it""" job = item.job + if job.complete: + return + user = job.user if isinstance(item.book, Work): item.book = item.book.default_edition