initial work on fixing user exports with s3

- custom storages
- tar.gz within bucket using s3_tar
- slightly changes export directory structure
- major problems still outstanding re delivering s3 files to end users
This commit is contained in:
Hugh Rundle 2024-01-14 12:14:44 +11:00
parent a4599d0374
commit cbd08127ef
No known key found for this signature in database
GPG key ID: A7E35779918253F9
9 changed files with 408 additions and 155 deletions

View file

@ -0,0 +1,53 @@
# Generated by Django 3.2.23 on 2024-01-14 00:55
import bookwyrm.storage_backends
import django.core.serializers.json
from django.db import migrations, models
import django.db.models.deletion
class Migration(migrations.Migration):
dependencies = [
('bookwyrm', '0191_merge_20240102_0326'),
]
operations = [
migrations.AddField(
model_name='bookwyrmexportjob',
name='export_json',
field=models.JSONField(encoder=django.core.serializers.json.DjangoJSONEncoder, null=True),
),
migrations.AddField(
model_name='bookwyrmexportjob',
name='json_completed',
field=models.BooleanField(default=False),
),
migrations.AlterField(
model_name='bookwyrmexportjob',
name='export_data',
field=models.FileField(null=True, storage=bookwyrm.storage_backends.ExportsFileStorage, upload_to=''),
),
migrations.CreateModel(
name='AddFileToTar',
fields=[
('childjob_ptr', models.OneToOneField(auto_created=True, on_delete=django.db.models.deletion.CASCADE, parent_link=True, primary_key=True, serialize=False, to='bookwyrm.childjob')),
('parent_export_job', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='child_edition_export_jobs', to='bookwyrm.bookwyrmexportjob')),
],
options={
'abstract': False,
},
bases=('bookwyrm.childjob',),
),
migrations.CreateModel(
name='AddBookToUserExportJob',
fields=[
('childjob_ptr', models.OneToOneField(auto_created=True, on_delete=django.db.models.deletion.CASCADE, parent_link=True, primary_key=True, serialize=False, to='bookwyrm.childjob')),
('edition', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to='bookwyrm.edition')),
],
options={
'abstract': False,
},
bases=('bookwyrm.childjob',),
),
]

View file

@ -2,94 +2,347 @@
import dataclasses import dataclasses
import logging import logging
import boto3
from s3_tar import S3Tar
from uuid import uuid4 from uuid import uuid4
from django.db.models import FileField from django.db.models import CASCADE, BooleanField, FileField, ForeignKey, JSONField
from django.db.models import Q from django.db.models import Q
from django.core.serializers.json import DjangoJSONEncoder from django.core.serializers.json import DjangoJSONEncoder
from django.core.files.base import ContentFile from django.core.files.base import ContentFile, File
from django.utils import timezone
from bookwyrm import settings, storage_backends
from bookwyrm.models import AnnualGoal, ReadThrough, ShelfBook, List, ListItem from bookwyrm.models import AnnualGoal, ReadThrough, ShelfBook, List, ListItem
from bookwyrm.models import Review, Comment, Quotation from bookwyrm.models import Review, Comment, Quotation
from bookwyrm.models import Edition from bookwyrm.models import Edition
from bookwyrm.models import UserFollows, User, UserBlocks from bookwyrm.models import UserFollows, User, UserBlocks
from bookwyrm.models.job import ParentJob, ParentTask from bookwyrm.models.job import ParentJob, ChildJob, ParentTask, SubTask
from bookwyrm.tasks import app, IMPORTS from bookwyrm.tasks import app, IMPORTS
from bookwyrm.utils.tar import BookwyrmTarFile from bookwyrm.utils.tar import BookwyrmTarFile
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class BookwyrmExportJob(ParentJob): class BookwyrmExportJob(ParentJob):
"""entry for a specific request to export a bookwyrm user""" """entry for a specific request to export a bookwyrm user"""
export_data = FileField(null=True) if settings.USE_S3:
storage = storage_backends.ExportsS3Storage
else:
storage = storage_backends.ExportsFileStorage
export_data = FileField(null=True, storage=storage) # use custom storage backend here
export_json = JSONField(null=True, encoder=DjangoJSONEncoder)
json_completed = BooleanField(default=False)
def start_job(self): def start_job(self):
"""Start the job""" """Start the job"""
start_export_task.delay(job_id=self.id, no_children=True)
return self task = start_export_task.delay(job_id=self.id, no_children=False)
self.task_id = task.id
self.save(update_fields=["task_id"])
def notify_child_job_complete(self):
"""let the job know when the items get work done"""
if self.complete:
return
self.updated_date = timezone.now()
self.save(update_fields=["updated_date"])
if not self.complete and self.has_completed:
if not self.json_completed:
try:
self.json_completed = True
self.save(update_fields=["json_completed"])
# add json file to tarfile
tar_job = AddFileToTar.objects.create(
parent_job=self,
parent_export_job=self
)
tar_job.start_job()
except Exception as err: # pylint: disable=broad-except
logger.exception("job %s failed with error: %s", self.id, err)
tar_job.set_status("failed")
self.stop_job(reason="failed")
else:
self.complete_job()
class AddBookToUserExportJob(ChildJob):
"""append book metadata for each book in an export"""
edition = ForeignKey(Edition, on_delete=CASCADE)
def start_job(self):
"""Start the job"""
try:
book = {}
book["work"] = self.edition.parent_work.to_activity()
book["edition"] = self.edition.to_activity()
if book["edition"].get("cover"):
# change the URL to be relative to the JSON file
filename = book["edition"]["cover"]["url"].rsplit("/", maxsplit=1)[-1]
book["edition"]["cover"]["url"] = f"covers/{filename}"
# authors
book["authors"] = []
for author in self.edition.authors.all():
book["authors"].append(author.to_activity())
# Shelves this book is on
# Every ShelfItem is this book so we don't other serializing
book["shelves"] = []
shelf_books = (
ShelfBook.objects.select_related("shelf")
.filter(user=self.parent_job.user, book=self.edition)
.distinct()
)
for shelfbook in shelf_books:
book["shelves"].append(shelfbook.shelf.to_activity())
# Lists and ListItems
# ListItems include "notes" and "approved" so we need them
# even though we know it's this book
book["lists"] = []
list_items = ListItem.objects.filter(book=self.edition, user=self.parent_job.user).distinct()
for item in list_items:
list_info = item.book_list.to_activity()
list_info[
"privacy"
] = item.book_list.privacy # this isn't serialized so we add it
list_info["list_item"] = item.to_activity()
book["lists"].append(list_info)
# Statuses
# Can't use select_subclasses here because
# we need to filter on the "book" value,
# which is not available on an ordinary Status
for status in ["comments", "quotations", "reviews"]:
book[status] = []
comments = Comment.objects.filter(user=self.parent_job.user, book=self.edition).all()
for status in comments:
obj = status.to_activity()
obj["progress"] = status.progress
obj["progress_mode"] = status.progress_mode
book["comments"].append(obj)
quotes = Quotation.objects.filter(user=self.parent_job.user, book=self.edition).all()
for status in quotes:
obj = status.to_activity()
obj["position"] = status.position
obj["endposition"] = status.endposition
obj["position_mode"] = status.position_mode
book["quotations"].append(obj)
reviews = Review.objects.filter(user=self.parent_job.user, book=self.edition).all()
for status in reviews:
obj = status.to_activity()
book["reviews"].append(obj)
# readthroughs can't be serialized to activity
book_readthroughs = (
ReadThrough.objects.filter(user=self.parent_job.user, book=self.edition).distinct().values()
)
book["readthroughs"] = list(book_readthroughs)
self.parent_job.export_json["books"].append(book)
self.parent_job.save(update_fields=["export_json"])
self.complete_job()
except Exception as err: # pylint: disable=broad-except
logger.exception("AddBookToUserExportJob %s Failed with error: %s", self.id, err)
self.set_status("failed")
class AddFileToTar(ChildJob):
"""add files to export"""
parent_export_job = ForeignKey(
BookwyrmExportJob, on_delete=CASCADE, related_name="child_edition_export_jobs"
) # TODO: do we actually need this? Does self.parent_job.export_data work?
def start_job(self):
"""Start the job"""
# NOTE we are doing this all in one big job, which has the potential to block a thread
# This is because we need to refer to the same s3_job or BookwyrmTarFile whilst writing
# Alternatives using a series of jobs in a loop would be beter
# but Hugh couldn't make that work
try:
task_id=self.parent_export_job.task_id
export_data = self.parent_export_job.export_data
export_json = self.parent_export_job.export_json
json_data = DjangoJSONEncoder().encode(export_json)
user = self.parent_export_job.user
editions = get_books_for_user(user)
if settings.USE_S3:
s3_job = S3Tar(
settings.AWS_STORAGE_BUCKET_NAME,
f"exports/{str(self.parent_export_job.task_id)}.tar.gz"
)
# TODO: either encrypt the file or we will need to get it to the user
# from this secure part of the bucket
export_data.save("archive.json", ContentFile(json_data.encode("utf-8")))
s3_job.add_file(
f"exports/{export_data.name}"
)
s3_job.add_file(
f"images/{user.avatar.name}",
folder="avatar"
)
for book in editions:
if getattr(book, "cover", False):
cover_name = f"images/{book.cover.name}"
s3_job.add_file(
cover_name,
folder="covers"
)
s3_job.tar()
# delete export json as soon as it's tarred
# there is probably a better way to do this
# Currently this merely makes the file empty
export_data.delete(save=False)
else:
# TODO: is the export_data file open to the world?
logger.info( "export file URL: %s",export_data.url)
export_data.open("wb")
with BookwyrmTarFile.open(mode="w:gz", fileobj=export_data) as tar:
tar.write_bytes(json_data.encode("utf-8"))
# Add avatar image if present
if getattr(user, "avatar", False):
tar.add_image(user.avatar, filename="avatar", directory=f"avatar/") # TODO: does this work?
for book in editions:
if getattr(book, "cover", False):
tar.add_image(book.cover)
export_data.close()
self.complete_job()
except Exception as err: # pylint: disable=broad-except
logger.exception("AddFileToTar %s Failed with error: %s", self.id, err)
self.stop_job(reason="failed")
self.parent_job.stop_job(reason="failed")
@app.task(queue=IMPORTS, base=ParentTask) @app.task(queue=IMPORTS, base=ParentTask)
def start_export_task(**kwargs): def start_export_task(**kwargs):
"""trigger the child tasks for each row""" """trigger the child tasks for user export"""
job = BookwyrmExportJob.objects.get(id=kwargs["job_id"]) job = BookwyrmExportJob.objects.get(id=kwargs["job_id"])
# don't start the job if it was stopped from the UI # don't start the job if it was stopped from the UI
if job.complete: if job.complete:
return return
try: try:
# This is where ChildJobs get made
# prepare the initial file and base json
job.export_data = ContentFile(b"", str(uuid4())) job.export_data = ContentFile(b"", str(uuid4()))
json_data = json_export(job.user) job.export_json = job.user.to_activity()
tar_export(json_data, job.user, job.export_data) job.save(update_fields=["export_data", "export_json"])
job.save(update_fields=["export_data"])
# let's go
json_export.delay(job_id=job.id, job_user=job.user.id, no_children=False)
except Exception as err: # pylint: disable=broad-except except Exception as err: # pylint: disable=broad-except
logger.exception("User Export Job %s Failed with error: %s", job.id, err) logger.exception("User Export Job %s Failed with error: %s", job.id, err)
job.set_status("failed") job.set_status("failed")
job.set_status("complete") @app.task(queue=IMPORTS, base=ParentTask)
def export_saved_lists_task(**kwargs):
"""add user saved lists to export JSON"""
job = BookwyrmExportJob.objects.get(id=kwargs["job_id"])
saved_lists = List.objects.filter(id__in=job.user.saved_lists.all()).distinct()
job.export_json["saved_lists"] = [l.remote_id for l in saved_lists]
job.save(update_fields=["export_json"])
def tar_export(json_data: str, user, file): @app.task(queue=IMPORTS, base=ParentTask)
"""wrap the export information in a tar file""" def export_follows_task(**kwargs):
file.open("wb") """add user follows to export JSON"""
with BookwyrmTarFile.open(mode="w:gz", fileobj=file) as tar:
tar.write_bytes(json_data.encode("utf-8"))
# Add avatar image if present job = BookwyrmExportJob.objects.get(id=kwargs["job_id"])
if getattr(user, "avatar", False): follows = UserFollows.objects.filter(user_subject=job.user).distinct()
tar.add_image(user.avatar, filename="avatar") following = User.objects.filter(userfollows_user_object__in=follows).distinct()
job.export_json["follows"] = [f.remote_id for f in following]
editions = get_books_for_user(user) job.save(update_fields=["export_json"])
for book in editions:
if getattr(book, "cover", False):
tar.add_image(book.cover)
file.close()
def json_export( @app.task(queue=IMPORTS, base=ParentTask)
user, def export_blocks_task(**kwargs):
): # pylint: disable=too-many-locals, too-many-statements, too-many-branches """add user blocks to export JSON"""
job = BookwyrmExportJob.objects.get(id=kwargs["job_id"])
blocks = UserBlocks.objects.filter(user_subject=job.user).distinct()
blocking = User.objects.filter(userblocks_user_object__in=blocks).distinct()
job.export_json["blocks"] = [b.remote_id for b in blocking]
job.save(update_fields=["export_json"])
@app.task(queue=IMPORTS, base=ParentTask)
def export_reading_goals_task(**kwargs):
"""add user reading goals to export JSON"""
job = BookwyrmExportJob.objects.get(id=kwargs["job_id"])
reading_goals = AnnualGoal.objects.filter(user=job.user).distinct()
job.export_json["goals"] = []
for goal in reading_goals:
exported_user["goals"].append(
{"goal": goal.goal, "year": goal.year, "privacy": goal.privacy}
)
job.save(update_fields=["export_json"])
@app.task(queue=IMPORTS, base=ParentTask)
def json_export(**kwargs):
"""Generate an export for a user""" """Generate an export for a user"""
# User as AP object job = BookwyrmExportJob.objects.get(id=kwargs["job_id"])
exported_user = user.to_activity() job.set_status("active")
job_id = kwargs["job_id"]
# I don't love this but it prevents a JSON encoding error # I don't love this but it prevents a JSON encoding error
# when there is no user image # when there is no user image
if isinstance( if isinstance(
exported_user["icon"], job.export_json["icon"],
dataclasses._MISSING_TYPE, # pylint: disable=protected-access dataclasses._MISSING_TYPE, # pylint: disable=protected-access
): ):
exported_user["icon"] = {} job.export_json["icon"] = {}
else: else:
# change the URL to be relative to the JSON file # change the URL to be relative to the JSON file
file_type = exported_user["icon"]["url"].rsplit(".", maxsplit=1)[-1] file_type = job.export_json["icon"]["url"].rsplit(".", maxsplit=1)[-1]
filename = f"avatar.{file_type}" filename = f"avatar.{file_type}"
exported_user["icon"]["url"] = filename job.export_json["icon"]["url"] = filename
# Additional settings - can't be serialized as AP # Additional settings - can't be serialized as AP
vals = [ vals = [
@ -98,120 +351,45 @@ def json_export(
"default_post_privacy", "default_post_privacy",
"show_suggested_users", "show_suggested_users",
] ]
exported_user["settings"] = {} job.export_json["settings"] = {}
for k in vals: for k in vals:
exported_user["settings"][k] = getattr(user, k) job.export_json["settings"][k] = getattr(job.user, k)
# Reading goals - can't be serialized as AP job.export_json["books"] = []
reading_goals = AnnualGoal.objects.filter(user=user).distinct()
exported_user["goals"] = []
for goal in reading_goals:
exported_user["goals"].append(
{"goal": goal.goal, "year": goal.year, "privacy": goal.privacy}
)
# Reading history - can't be serialized as AP # save settings we just updated
readthroughs = ReadThrough.objects.filter(user=user).distinct().values() job.save(update_fields=["export_json"])
readthroughs = list(readthroughs)
# Books # trigger subtasks
editions = get_books_for_user(user) export_saved_lists_task.delay(job_id=job_id, no_children=False)
exported_user["books"] = [] export_follows_task.delay(job_id=job_id, no_children=False)
export_blocks_task.delay(job_id=job_id, no_children=False)
trigger_books_jobs.delay(job_id=job_id, no_children=False)
for edition in editions:
book = {}
book["work"] = edition.parent_work.to_activity()
book["edition"] = edition.to_activity()
if book["edition"].get("cover"): @app.task(queue=IMPORTS, base=ParentTask)
# change the URL to be relative to the JSON file def trigger_books_jobs(**kwargs):
filename = book["edition"]["cover"]["url"].rsplit("/", maxsplit=1)[-1] """trigger tasks to get data for each book"""
book["edition"]["cover"]["url"] = f"covers/{filename}"
# authors try:
book["authors"] = [] job = BookwyrmExportJob.objects.get(id=kwargs["job_id"])
for author in edition.authors.all(): editions = get_books_for_user(job.user)
book["authors"].append(author.to_activity())
# Shelves this book is on if len(editions) == 0:
# Every ShelfItem is this book so we don't other serializing job.notify_child_job_complete()
book["shelves"] = [] return
shelf_books = (
ShelfBook.objects.select_related("shelf")
.filter(user=user, book=edition)
.distinct()
)
for shelfbook in shelf_books: for edition in editions:
book["shelves"].append(shelfbook.shelf.to_activity()) try:
edition_job = AddBookToUserExportJob.objects.create(edition=edition, parent_job=job)
# Lists and ListItems edition_job.start_job()
# ListItems include "notes" and "approved" so we need them except Exception as err: # pylint: disable=broad-except
# even though we know it's this book logger.exception("AddBookToUserExportJob %s Failed with error: %s", edition_job.id, err)
book["lists"] = [] edition_job.set_status("failed")
list_items = ListItem.objects.filter(book=edition, user=user).distinct()
for item in list_items:
list_info = item.book_list.to_activity()
list_info[
"privacy"
] = item.book_list.privacy # this isn't serialized so we add it
list_info["list_item"] = item.to_activity()
book["lists"].append(list_info)
# Statuses
# Can't use select_subclasses here because
# we need to filter on the "book" value,
# which is not available on an ordinary Status
for status in ["comments", "quotations", "reviews"]:
book[status] = []
comments = Comment.objects.filter(user=user, book=edition).all()
for status in comments:
obj = status.to_activity()
obj["progress"] = status.progress
obj["progress_mode"] = status.progress_mode
book["comments"].append(obj)
quotes = Quotation.objects.filter(user=user, book=edition).all()
for status in quotes:
obj = status.to_activity()
obj["position"] = status.position
obj["endposition"] = status.endposition
obj["position_mode"] = status.position_mode
book["quotations"].append(obj)
reviews = Review.objects.filter(user=user, book=edition).all()
for status in reviews:
obj = status.to_activity()
book["reviews"].append(obj)
# readthroughs can't be serialized to activity
book_readthroughs = (
ReadThrough.objects.filter(user=user, book=edition).distinct().values()
)
book["readthroughs"] = list(book_readthroughs)
# append everything
exported_user["books"].append(book)
# saved book lists - just the remote id
saved_lists = List.objects.filter(id__in=user.saved_lists.all()).distinct()
exported_user["saved_lists"] = [l.remote_id for l in saved_lists]
# follows - just the remote id
follows = UserFollows.objects.filter(user_subject=user).distinct()
following = User.objects.filter(userfollows_user_object__in=follows).distinct()
exported_user["follows"] = [f.remote_id for f in following]
# blocks - just the remote id
blocks = UserBlocks.objects.filter(user_subject=user).distinct()
blocking = User.objects.filter(userblocks_user_object__in=blocks).distinct()
exported_user["blocks"] = [b.remote_id for b in blocking]
return DjangoJSONEncoder().encode(exported_user)
except Exception as err: # pylint: disable=broad-except
logger.exception("trigger_books_jobs %s Failed with error: %s", job.id, err)
job.set_status("failed")
def get_books_for_user(user): def get_books_for_user(user):
"""Get all the books and editions related to a user""" """Get all the books and editions related to a user"""

View file

@ -135,8 +135,7 @@ class ParentJob(Job):
) )
app.control.revoke(list(tasks)) app.control.revoke(list(tasks))
for task in self.pending_child_jobs: self.pending_child_jobs.update(status=self.Status.STOPPED)
task.update(status=self.Status.STOPPED)
@property @property
def has_completed(self): def has_completed(self):
@ -248,7 +247,7 @@ class SubTask(app.Task):
""" """
def before_start( def before_start(
self, task_id, args, kwargs self, task_id, *args, **kwargs
): # pylint: disable=no-self-use, unused-argument ): # pylint: disable=no-self-use, unused-argument
"""Handler called before the task starts. Override. """Handler called before the task starts. Override.
@ -272,7 +271,7 @@ class SubTask(app.Task):
child_job.set_status(ChildJob.Status.ACTIVE) child_job.set_status(ChildJob.Status.ACTIVE)
def on_success( def on_success(
self, retval, task_id, args, kwargs self, retval, task_id, *args, **kwargs
): # pylint: disable=no-self-use, unused-argument ): # pylint: disable=no-self-use, unused-argument
"""Run by the worker if the task executes successfully. Override. """Run by the worker if the task executes successfully. Override.

View file

@ -442,3 +442,4 @@ if HTTP_X_FORWARDED_PROTO:
# Do not change this setting unless you already have an existing # Do not change this setting unless you already have an existing
# user with the same username - in which case you should change it! # user with the same username - in which case you should change it!
INSTANCE_ACTOR_USERNAME = "bookwyrm.instance.actor" INSTANCE_ACTOR_USERNAME = "bookwyrm.instance.actor"
DATA_UPLOAD_MAX_MEMORY_SIZE = (1024**2 * 20) # 20MB TEMPORARY FIX WHILST WORKING ON THIS

View file

@ -1,6 +1,7 @@
"""Handles backends for storages""" """Handles backends for storages"""
import os import os
from tempfile import SpooledTemporaryFile from tempfile import SpooledTemporaryFile
from django.core.files.storage import FileSystemStorage
from storages.backends.s3boto3 import S3Boto3Storage from storages.backends.s3boto3 import S3Boto3Storage
from storages.backends.azure_storage import AzureStorage from storages.backends.azure_storage import AzureStorage
@ -61,3 +62,16 @@ class AzureImagesStorage(AzureStorage): # pylint: disable=abstract-method
location = "images" location = "images"
overwrite_files = False overwrite_files = False
class ExportsFileStorage(FileSystemStorage): # pylint: disable=abstract-method
"""Storage class for exports contents with local files"""
location = "exports"
overwrite_files = False
class ExportsS3Storage(S3Boto3Storage): # pylint: disable=abstract-method
"""Storage class for exports contents with S3"""
location = "exports"
default_acl = None
overwrite_files = False

View file

@ -9,7 +9,7 @@ from django.utils.translation import gettext_lazy as _
from django.templatetags.static import static from django.templatetags.static import static
from bookwyrm.models import User from bookwyrm.models import User
from bookwyrm.settings import INSTANCE_ACTOR_USERNAME from bookwyrm.settings import INSTANCE_ACTOR_USERNAME, USE_S3
register = template.Library() register = template.Library()
@ -133,15 +133,22 @@ def get_file_size(file):
"""display the size of a file in human readable terms""" """display the size of a file in human readable terms"""
try: try:
raw_size = os.stat(file.path).st_size # TODO: this obviously isn't a proper solution
if raw_size < 1024: # boto storages do not implement 'path'
return f"{raw_size} bytes" if not USE_S3:
if raw_size < 1024**2: raw_size = os.stat(file.path).st_size
return f"{raw_size/1024:.2f} KB" if raw_size < 1024:
if raw_size < 1024**3: return f"{raw_size} bytes"
return f"{raw_size/1024**2:.2f} MB" if raw_size < 1024**2:
return f"{raw_size/1024**3:.2f} GB" return f"{raw_size/1024:.2f} KB"
except Exception: # pylint: disable=broad-except if raw_size < 1024**3:
return f"{raw_size/1024**2:.2f} MB"
return f"{raw_size/1024**3:.2f} GB"
return ""
except Exception as e: # pylint: disable=broad-except
print(e)
return "" return ""

Binary file not shown.

Binary file not shown.

View file

@ -35,6 +35,7 @@ opentelemetry-sdk==1.16.0
protobuf==3.20.* protobuf==3.20.*
pyotp==2.8.0 pyotp==2.8.0
qrcode==7.3.1 qrcode==7.3.1
s3-tar==0.1.13
# Dev # Dev
pytest-django==4.1.0 pytest-django==4.1.0