''' openlibrary data connector ''' import re import requests from django.core.files.base import ContentFile from fedireads import models from .abstract_connector import AbstractConnector, SearchResult from .abstract_connector import update_from_mappings from .abstract_connector import get_date, get_data from .openlibrary_languages import languages class Connector(AbstractConnector): ''' instantiate a connector for OL ''' def __init__(self, identifier): super().__init__(identifier) get_first = lambda a: a[0] self.key_mappings = { 'isbn_13': ('isbn_13', get_first), 'isbn_10': ('isbn_10', get_first), 'oclc_numbers': ('oclc_number', get_first), 'lccn': ('lccn', get_first), } self.book_mappings = self.key_mappings.copy() self.book_mappings.update({ 'publish_date': ('published_date', get_date), 'first_publish_date': ('first_published_date', get_date), 'description': ('description', get_description), 'languages': ('languages', get_languages), 'number_of_pages': ('pages', None), 'series': ('series', get_first), }) def is_work_data(self, data): return bool(re.match(r'^[\/\w]+OL\d+W$', data['key'])) def get_edition_from_work_data(self, data): try: key = data['key'] except KeyError: return False url = '%s/%s/editions' % (self.books_url, key) data = get_data(url) return pick_default_edition(data['entries']) def get_work_from_edition_date(self, data): try: key = data['works'][0]['key'] except (IndexError, KeyError): return False url = '%s/%s' % (self.books_url, key) return get_data(url) def get_authors_from_data(self, data): ''' parse author json and load or create authors ''' for author_blob in data.get('authors', []): author_blob = author_blob.get('author', author_blob) # this id is "/authors/OL1234567A" and we want just "OL1234567A" author_id = author_blob['key'].split('/')[-1] yield self.get_or_create_author(author_id) def get_cover_from_data(self, data): ''' ask openlibrary for the cover ''' if not data.get('covers'): return None cover_id = data.get('covers')[0] image_name = '%s-M.jpg' % cover_id url = '%s/b/id/%s' % (self.covers_url, image_name) response = requests.get(url) if not response.ok: response.raise_for_status() image_content = ContentFile(response.content) return [image_name, image_content] def parse_search_data(self, data): return data.get('docs') def format_search_result(self, doc): # build the absolute id from the openlibrary key key = self.books_url + doc['key'] author = doc.get('author_name') or ['Unknown'] return SearchResult( doc.get('title'), key, ', '.join(author), doc.get('first_publish_year'), ) def load_edition_data(self, olkey): ''' query openlibrary for editions of a work ''' url = '%s/works/%s/editions.json' % (self.books_url, olkey) return get_data(url) def expand_book_data(self, book): work = book if isinstance(book, models.Edition): work = book.parent_work edition_options = self.load_edition_data(work.openlibrary_key) for edition_data in edition_options.get('entries'): olkey = edition_data.get('key').split('/')[-1] if models.Edition.objects.filter(openlibrary_key=olkey).count(): continue edition = self.create_book(olkey, edition_data, models.Edition) edition.parent_work = work edition.save() if not edition.authors and work.authors: edition.authors.set(work.authors.all()) def get_or_create_author(self, olkey): ''' load that author ''' if not re.match(r'^OL\d+A$', olkey): raise ValueError('Invalid OpenLibrary author ID') try: return models.Author.objects.get(openlibrary_key=olkey) except models.Author.DoesNotExist: pass url = '%s/authors/%s.json' % (self.base_url, olkey) data = get_data(url) author = models.Author(openlibrary_key=olkey) mappings = { 'birth_date': ('born', get_date), 'death_date': ('died', get_date), 'bio': ('bio', get_description), } author = update_from_mappings(author, data, mappings) name = data.get('name') # TODO this is making some BOLD assumption if name: author.last_name = name.split(' ')[-1] author.first_name = ' '.join(name.split(' ')[:-1]) author.save() return author def get_description(description_blob): ''' descriptions can be a string or a dict ''' if isinstance(description_blob, dict): return description_blob.get('value') return description_blob def get_languages(language_blob): ''' /language/eng -> English ''' langs = [] for lang in language_blob: langs.append( languages.get(lang.get('key', ''), None) ) return langs def pick_default_edition(options): ''' favor physical copies with covers in english ''' if not options: return None if len(options) == 1: return options[0] options = [e for e in options if e.get('cover')] or options options = [e for e in options if \ '/languages/eng' in str(e.get('languages'))] or options formats = ['paperback', 'hardcover', 'mass market paperback'] options = [e for e in options if \ str(e.get('physical_format')).lower() in formats] or options options = [e for e in options if e.get('isbn_13')] or options options = [e for e in options if e.get('ocaid')] or options return options[0]