''' openlibrary data connector ''' import re import requests from django.core.files.base import ContentFile from django.db import transaction from fedireads import models from .abstract_connector import AbstractConnector, SearchResult from .abstract_connector import update_from_mappings from .abstract_connector import get_date, get_data from .openlibrary_languages import languages class Connector(AbstractConnector): ''' instantiate a connector for OL ''' def __init__(self, identifier): get_first = lambda a: a[0] self.key_mappings = { 'isbn_13': ('isbn_13', get_first), 'isbn_10': ('isbn_10', get_first), 'oclc_numbers': ('oclc_number', get_first), 'lccn': ('lccn', get_first), } self.book_mappings = self.key_mappings.copy() self.book_mappings.update({ 'publish_date': ('published_date', get_date), 'first_publish_date': ('first_published_date', get_date), 'description': ('description', get_description), 'languages': ('languages', get_languages), 'number_of_pages': ('pages', None), 'series': ('series', get_first), }) super().__init__(identifier) def format_search_result(self, doc): key = doc['key'] key = key.split('/')[-1] author = doc.get('author_name') or ['Unknown'] return SearchResult( doc.get('title'), key, ', '.join(author), doc.get('first_publish_year'), ) def parse_search_data(self, data): return data.get('docs') def get_or_create_book(self, olkey): ''' pull up a book record by whatever means possible. if you give a work key, it should give you the default edition, annotated with work data. ''' book = models.Book.objects.select_subclasses().filter( openlibrary_key=olkey ).first() if book: if isinstance(book, models.Work): return book.default_edition return book # no book was found, so we start creating a new one if re.match(r'^OL\d+W$', olkey): with transaction.atomic(): # create both work and a default edition work_data = self.load_book_data(olkey) work = self.create_book(olkey, work_data, models.Work) edition_options = self.load_edition_data(olkey).get('entries') edition_data = pick_default_edition(edition_options) if not edition_data: # hack: re-use the work data as the edition data edition_data = work_data key = edition_data.get('key').split('/')[-1] edition = self.create_book(key, edition_data, models.Edition) edition.default = True edition.parent_work = work edition.save() else: with transaction.atomic(): edition_data = self.load_book_data(olkey) edition = self.create_book(olkey, edition_data, models.Edition) work_data = edition_data.get('works') if not work_data: # hack: we're re-using the edition data as the work data work_key = olkey else: work_key = work_data[0]['key'].split('/')[-1] work = models.Work.objects.filter( openlibrary_key=work_key ).first() if not work: work_data = self.load_book_data(work_key) work = self.create_book(work_key, work_data, models.Work) edition.parent_work = work edition.save() if not edition.authors and work.authors: edition.authors.set(work.authors.all()) edition.author_text = ', '.join(a.name for a in edition.authors) return edition def get_authors_from_data(self, data): ''' parse author json and load or create authors ''' authors = [] for author_blob in data.get('authors', []): # this id is "/authors/OL1234567A" and we want just "OL1234567A" author_blob = author_blob.get('author', author_blob) author_id = author_blob['key'].split('/')[-1] authors.append(self.get_or_create_author(author_id)) return authors def load_book_data(self, olkey): ''' query openlibrary for data on a book ''' url = '%s/works/%s.json' % (self.books_url, olkey) return get_data(url) def load_edition_data(self, olkey): ''' query openlibrary for editions of a work ''' url = '%s/works/%s/editions.json' % (self.books_url, olkey) return get_data(url) def expand_book_data(self, book): work = book if isinstance(book, models.Edition): work = book.parent_work edition_options = self.load_edition_data(work.openlibrary_key) for edition_data in edition_options.get('entries'): olkey = edition_data.get('key').split('/')[-1] if models.Edition.objects.filter(openlibrary_key=olkey).count(): continue edition = self.create_book(olkey, edition_data, models.Edition) edition.parent_work = work edition.save() if not edition.authors and work.authors: edition.authors.set(work.authors.all()) def get_or_create_author(self, olkey): ''' load that author ''' if not re.match(r'^OL\d+A$', olkey): raise ValueError('Invalid OpenLibrary author ID') try: return models.Author.objects.get(openlibrary_key=olkey) except models.Author.DoesNotExist: pass url = '%s/authors/%s.json' % (self.base_url, olkey) data = get_data(url) author = models.Author(openlibrary_key=olkey) mappings = { 'birth_date': ('born', get_date), 'death_date': ('died', get_date), 'bio': ('bio', get_description), } author = update_from_mappings(author, data, mappings) # TODO this is making some BOLD assumption name = data.get('name') if name: author.last_name = name.split(' ')[-1] author.first_name = ' '.join(name.split(' ')[:-1]) author.save() return author def get_cover_from_data(self, data): ''' ask openlibrary for the cover ''' if not data.get('covers'): return None cover_id = data.get('covers')[0] image_name = '%s-M.jpg' % cover_id url = '%s/b/id/%s' % (self.covers_url, image_name) response = requests.get(url) if not response.ok: response.raise_for_status() image_content = ContentFile(response.content) return [image_name, image_content] def get_description(description_blob): ''' descriptions can be a string or a dict ''' if isinstance(description_blob, dict): return description_blob.get('value') return description_blob def get_languages(language_blob): ''' /language/eng -> English ''' langs = [] for lang in language_blob: langs.append( languages.get(lang.get('key', ''), None) ) return langs def pick_default_edition(options): ''' favor physical copies with covers in english ''' if not options: return None if len(options) == 1: return options[0] options = [e for e in options if e.get('cover')] or options options = [e for e in options if \ '/languages/eng' in str(e.get('languages'))] or options formats = ['paperback', 'hardcover', 'mass market paperback'] options = [e for e in options if \ str(e.get('physical_format')).lower() in formats] or options options = [e for e in options if e.get('isbn_13')] or options options = [e for e in options if e.get('ocaid')] or options return options[0]