Consolidate BookwyrmExportJob into two tasks

Creating the export JSON and export TAR are now the only two tasks.
This commit is contained in:
Bart Schuurmans 2024-03-26 16:18:30 +01:00
parent ed2e9e5ea8
commit 9685ae5a0a

View file

@ -7,20 +7,19 @@ from boto3.session import Session as BotoSession
from s3_tar import S3Tar from s3_tar import S3Tar
from storages.backends.s3boto3 import S3Boto3Storage from storages.backends.s3boto3 import S3Boto3Storage
from django.db.models import CASCADE, BooleanField, FileField, ForeignKey, JSONField from django.db.models import BooleanField, FileField, JSONField
from django.db.models import Q from django.db.models import Q
from django.core.serializers.json import DjangoJSONEncoder from django.core.serializers.json import DjangoJSONEncoder
from django.core.files.base import ContentFile from django.core.files.base import ContentFile
from django.utils import timezone
from django.utils.module_loading import import_string from django.utils.module_loading import import_string
from bookwyrm import settings from bookwyrm import settings
from bookwyrm.models import AnnualGoal, ReadThrough, ShelfBook, List, ListItem from bookwyrm.models import AnnualGoal, ReadThrough, ShelfBook, ListItem
from bookwyrm.models import Review, Comment, Quotation from bookwyrm.models import Review, Comment, Quotation
from bookwyrm.models import Edition from bookwyrm.models import Edition
from bookwyrm.models import UserFollows, User, UserBlocks from bookwyrm.models import UserFollows, User, UserBlocks
from bookwyrm.models.job import ParentJob, ChildJob, ParentTask from bookwyrm.models.job import ParentJob
from bookwyrm.tasks import app, IMPORTS from bookwyrm.tasks import app, IMPORTS
from bookwyrm.utils.tar import BookwyrmTarFile from bookwyrm.utils.tar import BookwyrmTarFile
@ -49,40 +48,12 @@ class BookwyrmExportJob(ParentJob):
json_completed = BooleanField(default=False) json_completed = BooleanField(default=False)
def start_job(self): def start_job(self):
"""Start the job""" """schedule the first task"""
task = start_export_task.delay(job_id=self.id, no_children=False) task = create_export_json_task.delay(job_id=self.id)
self.task_id = task.id self.task_id = task.id
self.save(update_fields=["task_id"]) self.save(update_fields=["task_id"])
def notify_child_job_complete(self):
"""let the job know when the items get work done"""
if self.complete:
return
self.updated_date = timezone.now()
self.save(update_fields=["updated_date"])
if not self.complete and self.has_completed:
if not self.json_completed:
try:
self.json_completed = True
self.save(update_fields=["json_completed"])
tar_job = AddFileToTar.objects.create(
parent_job=self, parent_export_job=self
)
tar_job.start_job()
except Exception as err: # pylint: disable=broad-except
logger.exception("job %s failed with error: %s", self.id, err)
tar_job.set_status("failed")
self.stop_job(reason="failed")
else:
self.complete_job()
def url2relativepath(url: str) -> str: def url2relativepath(url: str) -> str:
"""turn an absolute URL into a relative filesystem path""" """turn an absolute URL into a relative filesystem path"""
@ -90,135 +61,47 @@ def url2relativepath(url: str) -> str:
return unquote(parsed.path[1:]) return unquote(parsed.path[1:])
class AddBookToUserExportJob(ChildJob): @app.task(queue=IMPORTS)
"""append book metadata for each book in an export""" def create_export_json_task(job_id):
"""create the JSON data for the export"""
edition = ForeignKey(Edition, on_delete=CASCADE) job = BookwyrmExportJob.objects.get(id=job_id)
# don't start the job if it was stopped from the UI
if job.complete:
return
# pylint: disable=too-many-locals
def start_job(self):
"""Start the job"""
try: try:
job.set_status("active")
book = {} # generate JSON structure
book["work"] = self.edition.parent_work.to_activity() job.export_json = export_json(job.user)
book["edition"] = self.edition.to_activity() job.save(update_fields=["export_json"])
if book["edition"].get("cover"):
book["edition"]["cover"]["url"] = url2relativepath(
book["edition"]["cover"]["url"]
)
# authors
book["authors"] = []
for author in self.edition.authors.all():
book["authors"].append(author.to_activity())
# Shelves this book is on
# Every ShelfItem is this book so we don't other serializing
book["shelves"] = []
shelf_books = (
ShelfBook.objects.select_related("shelf")
.filter(user=self.parent_job.user, book=self.edition)
.distinct()
)
for shelfbook in shelf_books:
book["shelves"].append(shelfbook.shelf.to_activity())
# Lists and ListItems
# ListItems include "notes" and "approved" so we need them
# even though we know it's this book
book["lists"] = []
list_items = ListItem.objects.filter(
book=self.edition, user=self.parent_job.user
).distinct()
for item in list_items:
list_info = item.book_list.to_activity()
list_info[
"privacy"
] = item.book_list.privacy # this isn't serialized so we add it
list_info["list_item"] = item.to_activity()
book["lists"].append(list_info)
# Statuses
# Can't use select_subclasses here because
# we need to filter on the "book" value,
# which is not available on an ordinary Status
for status in ["comments", "quotations", "reviews"]:
book[status] = []
comments = Comment.objects.filter(
user=self.parent_job.user, book=self.edition
).all()
for status in comments:
obj = status.to_activity()
obj["progress"] = status.progress
obj["progress_mode"] = status.progress_mode
book["comments"].append(obj)
quotes = Quotation.objects.filter(
user=self.parent_job.user, book=self.edition
).all()
for status in quotes:
obj = status.to_activity()
obj["position"] = status.position
obj["endposition"] = status.endposition
obj["position_mode"] = status.position_mode
book["quotations"].append(obj)
reviews = Review.objects.filter(
user=self.parent_job.user, book=self.edition
).all()
for status in reviews:
obj = status.to_activity()
book["reviews"].append(obj)
# readthroughs can't be serialized to activity
book_readthroughs = (
ReadThrough.objects.filter(user=self.parent_job.user, book=self.edition)
.distinct()
.values()
)
book["readthroughs"] = list(book_readthroughs)
self.parent_job.export_json["books"].append(book)
self.parent_job.save(update_fields=["export_json"])
self.complete_job()
# create archive in separate task
create_archive_task.delay(job_id=job.id)
except Exception as err: # pylint: disable=broad-except except Exception as err: # pylint: disable=broad-except
logger.exception( logger.exception(
"AddBookToUserExportJob %s Failed with error: %s", self.id, err "create_export_json_task for %s failed with error: %s", job, err
) )
self.set_status("failed") job.set_status("failed")
class AddFileToTar(ChildJob): @app.task(queue=IMPORTS)
"""add files to export""" def create_archive_task(job_id):
"""create the archive containing the JSON file and additional files"""
parent_export_job = ForeignKey( job = BookwyrmExportJob.objects.get(id=job_id)
BookwyrmExportJob, on_delete=CASCADE, related_name="child_edition_export_jobs"
)
def start_job(self): # don't start the job if it was stopped from the UI
"""Start the job""" if job.complete:
return
# NOTE we are doing this all in one big job,
# which has the potential to block a thread
# This is because we need to refer to the same s3_job
# or BookwyrmTarFile whilst writing
# Using a series of jobs in a loop would be better
try: try:
export_job = self.parent_export_job export_task_id = job.task_id
export_task_id = str(export_job.task_id) export_json_bytes = DjangoJSONEncoder().encode(job.export_json).encode("utf-8")
export_json_bytes = ( user = job.user
DjangoJSONEncoder().encode(export_job.export_json).encode("utf-8")
)
user = export_job.user
editions = get_books_for_user(user) editions = get_books_for_user(user)
if settings.USE_S3: if settings.USE_S3:
@ -252,15 +135,15 @@ class AddFileToTar(ChildJob):
# Create archive and store file name # Create archive and store file name
s3_tar.tar() s3_tar.tar()
export_job.export_data = s3_archive_path job.export_data = s3_archive_path
export_job.save(update_fields=["export_data"]) job.save(update_fields=["export_data"])
# Delete temporary files # Delete temporary files
S3Boto3Storage.delete(storage, export_json_tmp_file) S3Boto3Storage.delete(storage, export_json_tmp_file)
else: else:
export_job.export_data = f"{export_task_id}.tar.gz" job.export_data = f"{export_task_id}.tar.gz"
with export_job.export_data.open("wb") as tar_file: with job.export_data.open("wb") as tar_file:
with BookwyrmTarFile.open(mode="w:gz", fileobj=tar_file) as tar: with BookwyrmTarFile.open(mode="w:gz", fileobj=tar_file) as tar:
# save json file # save json file
tar.write_bytes(export_json_bytes) tar.write_bytes(export_json_bytes)
@ -272,161 +155,149 @@ class AddFileToTar(ChildJob):
for edition in editions: for edition in editions:
if edition.cover: if edition.cover:
tar.add_image(edition.cover, directory="images/") tar.add_image(edition.cover, directory="images/")
export_job.save(update_fields=["export_data"]) job.save(update_fields=["export_data"])
self.complete_job() job.set_status("completed")
except Exception as err: # pylint: disable=broad-except except Exception as err: # pylint: disable=broad-except
logger.exception("AddFileToTar %s Failed with error: %s", self.id, err) logger.exception("create_archive_task for %s failed with error: %s", job, err)
self.stop_job(reason="failed")
self.parent_job.stop_job(reason="failed")
@app.task(queue=IMPORTS, base=ParentTask)
def start_export_task(**kwargs):
"""trigger the child tasks for user export"""
job = BookwyrmExportJob.objects.get(id=kwargs["job_id"])
# don't start the job if it was stopped from the UI
if job.complete:
return
try:
# prepare the initial file and base json
job.export_json = job.user.to_activity()
job.save(update_fields=["export_json"])
# let's go
json_export.delay(job_id=job.id, job_user=job.user.id, no_children=False)
except Exception as err: # pylint: disable=broad-except
logger.exception("User Export Job %s Failed with error: %s", job.id, err)
job.set_status("failed") job.set_status("failed")
@app.task(queue=IMPORTS, base=ParentTask) def export_json(user: User):
def export_saved_lists_task(**kwargs): """create export JSON"""
"""add user saved lists to export JSON""" data = export_user(user) # in the root of the JSON structure
data["settings"] = export_settings(user)
job = BookwyrmExportJob.objects.get(id=kwargs["job_id"]) data["goals"] = export_goals(user)
saved_lists = List.objects.filter(id__in=job.user.saved_lists.all()).distinct() data["books"] = export_books(user)
job.export_json["saved_lists"] = [l.remote_id for l in saved_lists] data["saved_lists"] = export_saved_lists(user)
job.save(update_fields=["export_json"]) data["follows"] = export_follows(user)
data["blocks"] = export_blocks(user)
return data
@app.task(queue=IMPORTS, base=ParentTask) def export_user(user: User):
def export_follows_task(**kwargs): """export user data"""
"""add user follows to export JSON""" data = user.to_activity()
data["icon"]["url"] = (
job = BookwyrmExportJob.objects.get(id=kwargs["job_id"]) url2relativepath(data["icon"]["url"]) if data.get("icon", False) else {}
follows = UserFollows.objects.filter(user_subject=job.user).distinct()
following = User.objects.filter(userfollows_user_object__in=follows).distinct()
job.export_json["follows"] = [f.remote_id for f in following]
job.save(update_fields=["export_json"])
@app.task(queue=IMPORTS, base=ParentTask)
def export_blocks_task(**kwargs):
"""add user blocks to export JSON"""
job = BookwyrmExportJob.objects.get(id=kwargs["job_id"])
blocks = UserBlocks.objects.filter(user_subject=job.user).distinct()
blocking = User.objects.filter(userblocks_user_object__in=blocks).distinct()
job.export_json["blocks"] = [b.remote_id for b in blocking]
job.save(update_fields=["export_json"])
@app.task(queue=IMPORTS, base=ParentTask)
def export_reading_goals_task(**kwargs):
"""add user reading goals to export JSON"""
job = BookwyrmExportJob.objects.get(id=kwargs["job_id"])
reading_goals = AnnualGoal.objects.filter(user=job.user).distinct()
job.export_json["goals"] = []
for goal in reading_goals:
job.export_json["goals"].append(
{"goal": goal.goal, "year": goal.year, "privacy": goal.privacy}
) )
job.save(update_fields=["export_json"]) return data
@app.task(queue=IMPORTS, base=ParentTask) def export_settings(user: User):
def json_export(**kwargs): """Additional settings - can't be serialized as AP"""
"""Generate an export for a user"""
try:
job = BookwyrmExportJob.objects.get(id=kwargs["job_id"])
job.set_status("active")
job_id = kwargs["job_id"]
if not job.export_json.get("icon"):
job.export_json["icon"] = {}
else:
job.export_json["icon"]["url"] = url2relativepath(
job.export_json["icon"]["url"]
)
# Additional settings - can't be serialized as AP
vals = [ vals = [
"show_goal", "show_goal",
"preferred_timezone", "preferred_timezone",
"default_post_privacy", "default_post_privacy",
"show_suggested_users", "show_suggested_users",
] ]
job.export_json["settings"] = {} return {k: getattr(user, k) for k in vals}
for k in vals:
job.export_json["settings"][k] = getattr(job.user, k)
job.export_json["books"] = []
# save settings we just updated def export_saved_lists(user: User):
job.save(update_fields=["export_json"]) """add user saved lists to export JSON"""
return [l.remote_id for l in user.saved_lists.all()]
# trigger subtasks
export_saved_lists_task.delay(job_id=job_id, no_children=False)
export_follows_task.delay(job_id=job_id, no_children=False)
export_blocks_task.delay(job_id=job_id, no_children=False)
trigger_books_jobs.delay(job_id=job_id, no_children=False)
except Exception as err: # pylint: disable=broad-except def export_follows(user: User):
logger.exception( """add user follows to export JSON"""
"json_export task in job %s Failed with error: %s", follows = UserFollows.objects.filter(user_subject=user).distinct()
job.id, following = User.objects.filter(userfollows_user_object__in=follows).distinct()
err, return [f.remote_id for f in following]
def export_blocks(user: User):
"""add user blocks to export JSON"""
blocks = UserBlocks.objects.filter(user_subject=user).distinct()
blocking = User.objects.filter(userblocks_user_object__in=blocks).distinct()
return [b.remote_id for b in blocking]
def export_goals(user: User):
"""add user reading goals to export JSON"""
reading_goals = AnnualGoal.objects.filter(user=user).distinct()
return [
{"goal": goal.goal, "year": goal.year, "privacy": goal.privacy}
for goal in reading_goals
]
def export_books(user: User):
"""add books to export JSON"""
editions = get_books_for_user(user)
return [export_book(user, edition) for edition in editions]
def export_book(user: User, edition: Edition):
"""add book to export JSON"""
data = {}
data["work"] = edition.parent_work.to_activity()
data["edition"] = edition.to_activity()
if data["edition"].get("cover"):
data["edition"]["cover"]["url"] = url2relativepath(
data["edition"]["cover"]["url"]
) )
job.set_status("failed")
# authors
data["authors"] = [author.to_activity() for author in edition.authors.all()]
@app.task(queue=IMPORTS, base=ParentTask) # Shelves this book is on
def trigger_books_jobs(**kwargs): # Every ShelfItem is this book so we don't other serializing
"""trigger tasks to get data for each book""" shelf_books = (
ShelfBook.objects.select_related("shelf")
try: .filter(user=user, book=edition)
job = BookwyrmExportJob.objects.get(id=kwargs["job_id"]) .distinct()
editions = get_books_for_user(job.user)
if len(editions) == 0:
job.notify_child_job_complete()
return
for edition in editions:
try:
edition_job = AddBookToUserExportJob.objects.create(
edition=edition, parent_job=job
) )
edition_job.start_job() data["shelves"] = [shelfbook.shelf.to_activity() for shelfbook in shelf_books]
except Exception as err: # pylint: disable=broad-except
logger.exception(
"AddBookToUserExportJob %s Failed with error: %s",
edition_job.id,
err,
)
edition_job.set_status("failed")
except Exception as err: # pylint: disable=broad-except # Lists and ListItems
logger.exception("trigger_books_jobs %s Failed with error: %s", job.id, err) # ListItems include "notes" and "approved" so we need them
job.set_status("failed") # even though we know it's this book
list_items = ListItem.objects.filter(book=edition, user=user).distinct()
data["lists"] = []
for item in list_items:
list_info = item.book_list.to_activity()
list_info[
"privacy"
] = item.book_list.privacy # this isn't serialized so we add it
list_info["list_item"] = item.to_activity()
data["lists"].append(list_info)
# Statuses
# Can't use select_subclasses here because
# we need to filter on the "book" value,
# which is not available on an ordinary Status
for status in ["comments", "quotations", "reviews"]:
data[status] = []
comments = Comment.objects.filter(user=user, book=edition).all()
for status in comments:
obj = status.to_activity()
obj["progress"] = status.progress
obj["progress_mode"] = status.progress_mode
data["comments"].append(obj)
quotes = Quotation.objects.filter(user=user, book=edition).all()
for status in quotes:
obj = status.to_activity()
obj["position"] = status.position
obj["endposition"] = status.endposition
obj["position_mode"] = status.position_mode
data["quotations"].append(obj)
reviews = Review.objects.filter(user=user, book=edition).all()
data["reviews"] = [status.to_activity() for status in reviews]
# readthroughs can't be serialized to activity
book_readthroughs = (
ReadThrough.objects.filter(user=user, book=edition).distinct().values()
)
data["readthroughs"] = list(book_readthroughs)
return data
def get_books_for_user(user): def get_books_for_user(user):