mirror of
https://github.com/bookwyrm-social/bookwyrm.git
synced 2025-01-19 21:55:41 +00:00
Start and stop jobs from the model
This commit is contained in:
parent
97513a43d6
commit
7a36de5ebe
6 changed files with 183 additions and 145 deletions
|
@ -1,15 +1,7 @@
|
|||
""" handle reading a csv from an external service, defaults are from Goodreads """
|
||||
import csv
|
||||
import logging
|
||||
|
||||
from django.utils import timezone
|
||||
from django.utils.translation import gettext_lazy as _
|
||||
|
||||
from bookwyrm import models
|
||||
from bookwyrm.models import ImportJob, ImportItem
|
||||
from bookwyrm.tasks import app, LOW
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class Importer:
|
||||
|
@ -118,135 +110,3 @@ class Importer:
|
|||
# this will re-normalize the raw data
|
||||
self.create_item(job, item.index, item.data)
|
||||
return job
|
||||
|
||||
def start_import(self, job): # pylint: disable=no-self-use
|
||||
"""initalizes a csv import job"""
|
||||
result = start_import_task.delay(job.id)
|
||||
job.task_id = result.id
|
||||
job.save()
|
||||
|
||||
|
||||
@app.task(queue="low_priority")
|
||||
def start_import_task(job_id):
|
||||
"""trigger the child tasks for each row"""
|
||||
job = ImportJob.objects.get(id=job_id)
|
||||
# these are sub-tasks so that one big task doesn't use up all the memory in celery
|
||||
for item in job.items.values_list("id", flat=True).all():
|
||||
task = import_item_task.delay(item)
|
||||
item.task_id = task.id
|
||||
item.save()
|
||||
job.status = "active"
|
||||
job.save()
|
||||
|
||||
|
||||
@app.task(queue="low_priority")
|
||||
def import_item_task(item_id):
|
||||
"""resolve a row into a book"""
|
||||
item = models.ImportItem.objects.get(id=item_id)
|
||||
# make sure the job has not been stopped
|
||||
if item.job.complete:
|
||||
return
|
||||
|
||||
try:
|
||||
item.resolve()
|
||||
except Exception as err: # pylint: disable=broad-except
|
||||
item.fail_reason = _("Error loading book")
|
||||
item.save()
|
||||
item.update_job()
|
||||
raise err
|
||||
|
||||
if item.book:
|
||||
# shelves book and handles reviews
|
||||
handle_imported_book(item)
|
||||
else:
|
||||
item.fail_reason = _("Could not find a match for book")
|
||||
|
||||
item.save()
|
||||
item.update_job()
|
||||
|
||||
|
||||
def handle_imported_book(item):
|
||||
"""process a csv and then post about it"""
|
||||
job = item.job
|
||||
user = job.user
|
||||
if isinstance(item.book, models.Work):
|
||||
item.book = item.book.default_edition
|
||||
if not item.book:
|
||||
item.fail_reason = _("Error loading book")
|
||||
item.save()
|
||||
return
|
||||
if not isinstance(item.book, models.Edition):
|
||||
item.book = item.book.edition
|
||||
|
||||
existing_shelf = models.ShelfBook.objects.filter(book=item.book, user=user).exists()
|
||||
|
||||
# shelve the book if it hasn't been shelved already
|
||||
if item.shelf and not existing_shelf:
|
||||
desired_shelf = models.Shelf.objects.get(identifier=item.shelf, user=user)
|
||||
shelved_date = item.date_added or timezone.now()
|
||||
models.ShelfBook(
|
||||
book=item.book, shelf=desired_shelf, user=user, shelved_date=shelved_date
|
||||
).save(priority=LOW)
|
||||
|
||||
for read in item.reads:
|
||||
# check for an existing readthrough with the same dates
|
||||
if models.ReadThrough.objects.filter(
|
||||
user=user,
|
||||
book=item.book,
|
||||
start_date=read.start_date,
|
||||
finish_date=read.finish_date,
|
||||
).exists():
|
||||
continue
|
||||
read.book = item.book
|
||||
read.user = user
|
||||
read.save()
|
||||
|
||||
if job.include_reviews and (item.rating or item.review) and not item.linked_review:
|
||||
# we don't know the publication date of the review,
|
||||
# but "now" is a bad guess
|
||||
published_date_guess = item.date_read or item.date_added
|
||||
if item.review:
|
||||
# pylint: disable=consider-using-f-string
|
||||
review_title = "Review of {!r} on {!r}".format(
|
||||
item.book.title,
|
||||
job.source,
|
||||
)
|
||||
review = models.Review.objects.filter(
|
||||
user=user,
|
||||
book=item.book,
|
||||
name=review_title,
|
||||
rating=item.rating,
|
||||
published_date=published_date_guess,
|
||||
).first()
|
||||
if not review:
|
||||
review = models.Review(
|
||||
user=user,
|
||||
book=item.book,
|
||||
name=review_title,
|
||||
content=item.review,
|
||||
rating=item.rating,
|
||||
published_date=published_date_guess,
|
||||
privacy=job.privacy,
|
||||
)
|
||||
review.save(software="bookwyrm", priority=LOW)
|
||||
else:
|
||||
# just a rating
|
||||
review = models.ReviewRating.objects.filter(
|
||||
user=user,
|
||||
book=item.book,
|
||||
published_date=published_date_guess,
|
||||
rating=item.rating,
|
||||
).first()
|
||||
if not review:
|
||||
review = models.ReviewRating(
|
||||
user=user,
|
||||
book=item.book,
|
||||
rating=item.rating,
|
||||
published_date=published_date_guess,
|
||||
privacy=job.privacy,
|
||||
)
|
||||
review.save(software="bookwyrm", priority=LOW)
|
||||
|
||||
# only broadcast this review to other bookwyrm instances
|
||||
item.linked_review = review
|
||||
item.save()
|
||||
|
|
|
@ -8,7 +8,18 @@ from django.utils import timezone
|
|||
from django.utils.translation import gettext_lazy as _
|
||||
|
||||
from bookwyrm.connectors import connector_manager
|
||||
from bookwyrm.models import ReadThrough, User, Book, Edition
|
||||
from bookwyrm.models import (
|
||||
User,
|
||||
Book,
|
||||
Edition,
|
||||
Work,
|
||||
ShelfBook,
|
||||
Shelf,
|
||||
ReadThrough,
|
||||
Review,
|
||||
ReviewRating,
|
||||
)
|
||||
from bookwyrm.tasks import app, LOW
|
||||
from .fields import PrivacyLevels
|
||||
|
||||
|
||||
|
@ -57,6 +68,25 @@ class ImportJob(models.Model):
|
|||
max_length=50, choices=ImportStatuses, default="pending", null=True
|
||||
)
|
||||
|
||||
def start_job(self):
|
||||
"""Report that the job has started"""
|
||||
start_import_task.delay(self.id)
|
||||
|
||||
self.status = "active"
|
||||
self.save(update_fields=["status"])
|
||||
|
||||
def complete_job(self):
|
||||
"""Report that the job has completed"""
|
||||
self.status = "complete"
|
||||
self.complete = True
|
||||
self.save(update_fields=["status", "complete"])
|
||||
|
||||
def stop_job(self):
|
||||
"""Stop the job"""
|
||||
self.status = "stopped"
|
||||
self.complete = True
|
||||
self.save(update_fields=["status", "complete"])
|
||||
|
||||
@property
|
||||
def pending_items(self):
|
||||
"""items that haven't been processed yet"""
|
||||
|
@ -282,3 +312,142 @@ class ImportItem(models.Model):
|
|||
return "{} by {}".format(
|
||||
self.normalized_data.get("title"), self.normalized_data.get("authors")
|
||||
)
|
||||
|
||||
|
||||
@app.task(queue="low_priority")
|
||||
def start_import_task(job_id):
|
||||
"""trigger the child tasks for each row"""
|
||||
job = ImportJob.objects.get(id=job_id)
|
||||
# these are sub-tasks so that one big task doesn't use up all the memory in celery
|
||||
for item in job.items.all():
|
||||
task = import_item_task.delay(item.id)
|
||||
item.task_id = task.id
|
||||
item.save()
|
||||
job.status = "active"
|
||||
job.save()
|
||||
|
||||
|
||||
@app.task(queue="low_priority")
|
||||
def stop_import_task(job_id):
|
||||
"""Wait, let's not"""
|
||||
job = ImportJob.objects.get(id=job_id)
|
||||
job.status = "stopped"
|
||||
job.complete = True
|
||||
job.save(update_fields=["status", "complete"])
|
||||
for item in job.items.filter(task_id__isnull=False).values_list(
|
||||
"task_id", flat=True
|
||||
):
|
||||
app.control.revoke(item.task_id)
|
||||
|
||||
|
||||
@app.task(queue="low_priority")
|
||||
def import_item_task(item_id):
|
||||
"""resolve a row into a book"""
|
||||
item = ImportItem.objects.get(id=item_id)
|
||||
# make sure the job has not been stopped
|
||||
if item.job.complete:
|
||||
return
|
||||
|
||||
try:
|
||||
item.resolve()
|
||||
except Exception as err: # pylint: disable=broad-except
|
||||
item.fail_reason = _("Error loading book")
|
||||
item.save()
|
||||
item.update_job()
|
||||
raise err
|
||||
|
||||
if item.book:
|
||||
# shelves book and handles reviews
|
||||
handle_imported_book(item)
|
||||
else:
|
||||
item.fail_reason = _("Could not find a match for book")
|
||||
|
||||
item.save()
|
||||
item.update_job()
|
||||
|
||||
|
||||
def handle_imported_book(item):
|
||||
"""process a csv and then post about it"""
|
||||
job = item.job
|
||||
user = job.user
|
||||
if isinstance(item.book, Work):
|
||||
item.book = item.book.default_edition
|
||||
if not item.book:
|
||||
item.fail_reason = _("Error loading book")
|
||||
item.save()
|
||||
return
|
||||
if not isinstance(item.book, Edition):
|
||||
item.book = item.book.edition
|
||||
|
||||
existing_shelf = ShelfBook.objects.filter(book=item.book, user=user).exists()
|
||||
|
||||
# shelve the book if it hasn't been shelved already
|
||||
if item.shelf and not existing_shelf:
|
||||
desired_shelf = Shelf.objects.get(identifier=item.shelf, user=user)
|
||||
shelved_date = item.date_added or timezone.now()
|
||||
ShelfBook(
|
||||
book=item.book, shelf=desired_shelf, user=user, shelved_date=shelved_date
|
||||
).save(priority=LOW)
|
||||
|
||||
for read in item.reads:
|
||||
# check for an existing readthrough with the same dates
|
||||
if ReadThrough.objects.filter(
|
||||
user=user,
|
||||
book=item.book,
|
||||
start_date=read.start_date,
|
||||
finish_date=read.finish_date,
|
||||
).exists():
|
||||
continue
|
||||
read.book = item.book
|
||||
read.user = user
|
||||
read.save()
|
||||
|
||||
if job.include_reviews and (item.rating or item.review) and not item.linked_review:
|
||||
# we don't know the publication date of the review,
|
||||
# but "now" is a bad guess
|
||||
published_date_guess = item.date_read or item.date_added
|
||||
if item.review:
|
||||
# pylint: disable=consider-using-f-string
|
||||
review_title = "Review of {!r} on {!r}".format(
|
||||
item.book.title,
|
||||
job.source,
|
||||
)
|
||||
review = Review.objects.filter(
|
||||
user=user,
|
||||
book=item.book,
|
||||
name=review_title,
|
||||
rating=item.rating,
|
||||
published_date=published_date_guess,
|
||||
).first()
|
||||
if not review:
|
||||
review = Review(
|
||||
user=user,
|
||||
book=item.book,
|
||||
name=review_title,
|
||||
content=item.review,
|
||||
rating=item.rating,
|
||||
published_date=published_date_guess,
|
||||
privacy=job.privacy,
|
||||
)
|
||||
review.save(software="bookwyrm", priority=LOW)
|
||||
else:
|
||||
# just a rating
|
||||
review = ReviewRating.objects.filter(
|
||||
user=user,
|
||||
book=item.book,
|
||||
published_date=published_date_guess,
|
||||
rating=item.rating,
|
||||
).first()
|
||||
if not review:
|
||||
review = ReviewRating(
|
||||
user=user,
|
||||
book=item.book,
|
||||
rating=item.rating,
|
||||
published_date=published_date_guess,
|
||||
privacy=job.privacy,
|
||||
)
|
||||
review.save(software="bookwyrm", priority=LOW)
|
||||
|
||||
# only broadcast this review to other bookwyrm instances
|
||||
item.linked_review = review
|
||||
item.save()
|
||||
|
|
|
@ -97,6 +97,6 @@ class Import(View):
|
|||
except (UnicodeDecodeError, ValueError, KeyError):
|
||||
return HttpResponseBadRequest(_("Not a valid csv file"))
|
||||
|
||||
importer.start_import(job)
|
||||
job.start_job()
|
||||
|
||||
return redirect(f"/import/{job.id}")
|
||||
|
|
|
@ -11,7 +11,7 @@ from django.views.decorators.http import require_POST
|
|||
|
||||
from bookwyrm import models
|
||||
from bookwyrm.importers import GoodreadsImporter
|
||||
from bookwyrm.importers.importer import import_item_task
|
||||
from bookwyrm.models.import_job import import_item_task
|
||||
from bookwyrm.settings import PAGE_LENGTH
|
||||
|
||||
# pylint: disable= no-self-use
|
||||
|
@ -74,3 +74,12 @@ def retry_item(request, job_id, item_id):
|
|||
)
|
||||
import_item_task.delay(item.id)
|
||||
return redirect("import-status", job_id)
|
||||
|
||||
|
||||
@login_required
|
||||
@require_POST
|
||||
def cancel_import(request, job_id):
|
||||
"""scrap that"""
|
||||
job = get_object_or_404(models.ImportJob, id=job_id, job__user=request.user)
|
||||
job.stop()
|
||||
return redirect("import-status", job_id)
|
||||
|
|
|
@ -9,7 +9,7 @@ from django.views import View
|
|||
from django.views.decorators.http import require_POST
|
||||
|
||||
from bookwyrm import models
|
||||
from bookwyrm.importers.importer import import_item_task
|
||||
from bookwyrm.models.import_job import import_item_task
|
||||
from bookwyrm.settings import PAGE_LENGTH
|
||||
|
||||
# pylint: disable= no-self-use
|
||||
|
|
|
@ -52,5 +52,5 @@ class ImportTroubleshoot(View):
|
|||
job,
|
||||
items,
|
||||
)
|
||||
importer.start_import(job)
|
||||
job.start_job()
|
||||
return redirect(f"/import/{job.id}")
|
||||
|
|
Loading…
Reference in a new issue