moviewyrm/fedireads/connectors/openlibrary.py

228 lines
7.9 KiB
Python
Raw Normal View History

2020-03-07 20:22:28 +00:00
''' openlibrary data connector '''
import re
import requests
2020-04-22 13:53:22 +00:00
from django.core.files.base import ContentFile
from django.db import transaction
2020-03-07 20:22:28 +00:00
from fedireads import models
2020-03-30 00:40:51 +00:00
from .abstract_connector import AbstractConnector, SearchResult
from .abstract_connector import update_from_mappings
from .abstract_connector import get_date, get_data
2020-03-30 20:15:49 +00:00
from .openlibrary_languages import languages
2020-03-07 20:22:28 +00:00
class Connector(AbstractConnector):
2020-03-07 20:22:28 +00:00
''' instantiate a connector for OL '''
def __init__(self, identifier):
get_first = lambda a: a[0]
self.key_mappings = {
2020-04-29 17:09:14 +00:00
'isbn_13': ('isbn_13', get_first),
'isbn_10': ('isbn_10', get_first),
'oclc_numbers': ('oclc_number', get_first),
'lccn': ('lccn', get_first),
}
self.book_mappings = self.key_mappings.copy()
self.book_mappings.update({
'publish_date': ('published_date', get_date),
'first_publish_date': ('first_published_date', get_date),
'description': ('description', get_description),
'languages': ('languages', get_languages),
'number_of_pages': ('pages', None),
'series': ('series', get_first),
})
super().__init__(identifier)
2020-03-07 20:22:28 +00:00
def format_search_result(self, doc):
key = doc['key']
key = key.split('/')[-1]
author = doc.get('author_name') or ['Unknown']
return SearchResult(
doc.get('title'),
key,
author[0],
doc.get('first_publish_year'),
2020-03-28 19:55:53 +00:00
)
2020-03-07 20:22:28 +00:00
def parse_search_data(self, data):
return data.get('docs')
2020-03-07 20:22:28 +00:00
def get_or_create_book(self, olkey):
2020-03-30 19:21:04 +00:00
''' pull up a book record by whatever means possible.
if you give a work key, it should give you the default edition,
annotated with work data. '''
2020-03-07 20:22:28 +00:00
book = models.Book.objects.select_subclasses().filter(
openlibrary_key=olkey
).first()
if book:
if isinstance(book, models.Work):
return book.default_edition
2020-03-07 20:22:28 +00:00
return book
2020-03-30 19:21:04 +00:00
# no book was found, so we start creating a new one
if re.match(r'^OL\d+W$', olkey):
with transaction.atomic():
# create both work and a default edition
work_data = self.load_book_data(olkey)
work = self.create_book(olkey, work_data, models.Work)
edition_options = self.load_edition_data(olkey).get('entries')
edition_data = pick_default_edition(edition_options)
2020-05-03 21:09:55 +00:00
if not edition_data:
# hack: re-use the work data as the edition data
edition_data = work_data
key = edition_data.get('key').split('/')[-1]
edition = self.create_book(key, edition_data, models.Edition)
2020-04-29 18:21:36 +00:00
edition.default = True
edition.parent_work = work
edition.save()
else:
with transaction.atomic():
edition_data = self.load_book_data(olkey)
edition = self.create_book(olkey, edition_data, models.Edition)
work_data = edition_data.get('works')
if not work_data:
# hack: we're re-using the edition data as the work data
work_key = olkey
else:
work_key = work_data[0]['key'].split('/')[-1]
work = models.Work.objects.filter(
openlibrary_key=work_key
).first()
if not work:
work_data = self.load_book_data(work_key)
work = self.create_book(work_key, work_data, models.Work)
edition.parent_work = work
edition.save()
if not edition.authors and work.authors:
edition.authors.set(work.authors.all())
edition.author_text = ', '.join(a.name for a in edition.authors)
return edition
def get_authors_from_data(self, data):
''' parse author json and load or create authors '''
authors = []
2020-03-14 04:10:53 +00:00
for author_blob in data.get('authors', []):
2020-03-07 20:22:28 +00:00
# this id is "/authors/OL1234567A" and we want just "OL1234567A"
author_blob = author_blob.get('author', author_blob)
author_id = author_blob['key'].split('/')[-1]
authors.append(self.get_or_create_author(author_id))
return authors
2020-03-07 20:22:28 +00:00
def load_book_data(self, olkey):
''' query openlibrary for data on a book '''
url = '%s/works/%s.json' % (self.books_url, olkey)
return get_data(url)
def load_edition_data(self, olkey):
''' query openlibrary for editions of a work '''
url = '%s/works/%s/editions.json' % (self.books_url, olkey)
return get_data(url)
2020-03-07 20:22:28 +00:00
2020-04-02 05:11:31 +00:00
def expand_book_data(self, book):
work = book
if isinstance(book, models.Edition):
work = book.parent_work
edition_options = self.load_edition_data(work.openlibrary_key)
for edition_data in edition_options.get('entries'):
olkey = edition_data.get('key').split('/')[-1]
if models.Edition.objects.filter(openlibrary_key=olkey).count():
continue
edition = self.create_book(olkey, edition_data, models.Edition)
edition.parent_work = work
edition.save()
if not edition.authors and work.authors:
edition.authors.set(work.authors.all())
2020-03-30 19:21:04 +00:00
2020-03-07 20:22:28 +00:00
def get_or_create_author(self, olkey):
''' load that author '''
if not re.match(r'^OL\d+A$', olkey):
raise ValueError('Invalid OpenLibrary author ID')
try:
2020-03-14 04:10:53 +00:00
return models.Author.objects.get(openlibrary_key=olkey)
2020-04-04 20:12:15 +00:00
except models.Author.DoesNotExist:
2020-03-07 20:22:28 +00:00
pass
url = '%s/authors/%s.json' % (self.base_url, olkey)
data = get_data(url)
2020-03-07 20:22:28 +00:00
author = models.Author(openlibrary_key=olkey)
2020-03-28 19:55:53 +00:00
mappings = {
'birth_date': ('born', get_date),
'death_date': ('died', get_date),
'bio': ('bio', get_description),
}
author = update_from_mappings(author, data, mappings)
2020-03-07 20:22:28 +00:00
# TODO this is making some BOLD assumption
name = data.get('name')
2020-04-01 21:18:46 +00:00
if name:
author.last_name = name.split(' ')[-1]
author.first_name = ' '.join(name.split(' ')[:-1])
2020-03-07 20:22:28 +00:00
author.save()
return author
def get_cover_from_data(self, data):
2020-03-07 20:22:28 +00:00
''' ask openlibrary for the cover '''
if not data.get('covers'):
return None
cover_id = data.get('covers')[0]
2020-03-07 20:22:28 +00:00
image_name = '%s-M.jpg' % cover_id
url = '%s/b/id/%s' % (self.covers_url, image_name)
response = requests.get(url)
if not response.ok:
response.raise_for_status()
2020-03-14 04:10:53 +00:00
image_content = ContentFile(response.content)
2020-03-07 20:22:28 +00:00
return [image_name, image_content]
2020-03-28 04:28:52 +00:00
def get_description(description_blob):
''' descriptions can be a string or a dict '''
if isinstance(description_blob, dict):
return description_blob.get('value')
return description_blob
2020-03-30 20:15:49 +00:00
def get_languages(language_blob):
''' /language/eng -> English '''
langs = []
for lang in language_blob:
langs.append(
languages.get(lang.get('key', ''), None)
)
return langs
def pick_default_edition(options):
''' favor physical copies with covers in english '''
2020-04-22 13:53:22 +00:00
if not options:
return None
if len(options) == 1:
return options[0]
options = [e for e in options if e.get('cover')] or options
options = [e for e in options if \
'/languages/eng' in str(e.get('languages'))] or options
formats = ['paperback', 'hardcover', 'mass market paperback']
options = [e for e in options if \
str(e.get('physical_format')).lower() in formats] or options
options = [e for e in options if e.get('isbn_13')] or options
options = [e for e in options if e.get('ocaid')] or options
return options[0]