Type annotations for utils

This commit is contained in:
Joeri de Ruiter 2023-07-23 20:50:44 +02:00
parent 07aca2f62c
commit 0354e53eea
9 changed files with 130 additions and 66 deletions

View file

@ -1,7 +1,10 @@
""" basics for an activitypub serializer """ """ basics for an activitypub serializer """
from __future__ import annotations
from dataclasses import dataclass, fields, MISSING from dataclasses import dataclass, fields, MISSING
from json import JSONEncoder from json import JSONEncoder
import logging import logging
from typing import Optional, Type, TypeVar
import requests import requests
from django.apps import apps from django.apps import apps
@ -10,6 +13,7 @@ from django.utils.http import http_date
from bookwyrm import models from bookwyrm import models
from bookwyrm.connectors import ConnectorException, get_data from bookwyrm.connectors import ConnectorException, get_data
from bookwyrm.models.base_model import BookWyrmModel
from bookwyrm.signatures import make_signature from bookwyrm.signatures import make_signature
from bookwyrm.settings import DOMAIN, INSTANCE_ACTOR_USERNAME from bookwyrm.settings import DOMAIN, INSTANCE_ACTOR_USERNAME
from bookwyrm.tasks import app, MISC from bookwyrm.tasks import app, MISC
@ -58,6 +62,9 @@ def naive_parse(activity_objects, activity_json, serializer=None):
return serializer(activity_objects=activity_objects, **activity_json) return serializer(activity_objects=activity_objects, **activity_json)
TModel = TypeVar("TModel", bound=BookWyrmModel)
@dataclass(init=False) @dataclass(init=False)
class ActivityObject: class ActivityObject:
"""actor activitypub json""" """actor activitypub json"""
@ -101,13 +108,13 @@ class ActivityObject:
# pylint: disable=too-many-locals,too-many-branches,too-many-arguments # pylint: disable=too-many-locals,too-many-branches,too-many-arguments
def to_model( def to_model(
self, self,
model=None, model: Optional[Type[TModel]] = None,
instance=None, instance: Optional[TModel] = None,
allow_create=True, allow_create: bool = True,
save=True, save: bool = True,
overwrite=True, overwrite: bool = True,
allow_external_connections=True, allow_external_connections: bool = True,
): ) -> Optional[TModel]:
"""convert from an activity to a model instance. Args: """convert from an activity to a model instance. Args:
model: the django model that this object is being converted to model: the django model that this object is being converted to
(will guess if not known) (will guess if not known)

View file

@ -1,6 +1,6 @@
""" book and author data """ """ book and author data """
from dataclasses import dataclass, field from dataclasses import dataclass, field
from typing import List from typing import List, Optional
from .base_activity import ActivityObject from .base_activity import ActivityObject
from .image import Document from .image import Document
@ -11,17 +11,17 @@ from .image import Document
class BookData(ActivityObject): class BookData(ActivityObject):
"""shared fields for all book data and authors""" """shared fields for all book data and authors"""
openlibraryKey: str = None openlibraryKey: Optional[str] = None
inventaireId: str = None inventaireId: Optional[str] = None
librarythingKey: str = None librarythingKey: Optional[str] = None
goodreadsKey: str = None goodreadsKey: Optional[str] = None
bnfId: str = None bnfId: Optional[str] = None
viaf: str = None viaf: Optional[str] = None
wikidata: str = None wikidata: Optional[str] = None
asin: str = None asin: Optional[str] = None
aasin: str = None aasin: Optional[str] = None
isfdb: str = None isfdb: Optional[str] = None
lastEditedBy: str = None lastEditedBy: Optional[str] = None
links: List[str] = field(default_factory=lambda: []) links: List[str] = field(default_factory=lambda: [])
fileLinks: List[str] = field(default_factory=lambda: []) fileLinks: List[str] = field(default_factory=lambda: [])
@ -83,11 +83,11 @@ class Author(BookData):
"""author of a book""" """author of a book"""
name: str name: str
isni: str = None isni: Optional[str] = None
viafId: str = None viafId: Optional[str] = None
gutenbergId: str = None gutenbergId: Optional[str] = None
born: str = None born: Optional[str] = None
died: str = None died: Optional[str] = None
aliases: List[str] = field(default_factory=lambda: []) aliases: List[str] = field(default_factory=lambda: [])
bio: str = "" bio: str = ""
wikipediaLink: str = "" wikipediaLink: str = ""

View file

@ -1,5 +1,7 @@
""" database schema for info about authors """ """ database schema for info about authors """
import re import re
from typing import Tuple, Any
from django.contrib.postgres.indexes import GinIndex from django.contrib.postgres.indexes import GinIndex
from django.db import models from django.db import models
@ -38,7 +40,7 @@ class Author(BookDataModel):
) )
bio = fields.HtmlField(null=True, blank=True) bio = fields.HtmlField(null=True, blank=True)
def save(self, *args, **kwargs): def save(self, *args: Tuple[Any, ...], **kwargs: dict[str, Any]) -> None:
"""normalize isni format""" """normalize isni format"""
if self.isni: if self.isni:
self.isni = re.sub(r"\s", "", self.isni) self.isni = re.sub(r"\s", "", self.isni)

View file

@ -1,8 +1,15 @@
""" Custom handler for caching """ """ Custom handler for caching """
from typing import Any, Callable, Tuple, Union
from django.core.cache import cache from django.core.cache import cache
def get_or_set(cache_key, function, *args, timeout=None): def get_or_set(
cache_key: str,
function: Callable[..., Any],
*args: Tuple[Any, ...],
timeout: Union[float, None] = None
) -> Any:
"""Django's built-in get_or_set isn't cutting it""" """Django's built-in get_or_set isn't cutting it"""
value = cache.get(cache_key) value = cache.get(cache_key)
if value is None: if value is None:

View file

@ -1,15 +1,24 @@
"""ISNI author checking utilities""" """ISNI author checking utilities"""
import xml.etree.ElementTree as ET import xml.etree.ElementTree as ET
from typing import Union, Optional
import requests import requests
from bookwyrm import activitypub, models from bookwyrm import activitypub, models
def request_isni_data(search_index, search_term, max_records=5): def get_element_text(element: Optional[ET.Element]) -> str:
"""If the element is not None and there is a text attribute return this"""
if element is not None and element.text is not None:
return element.text
return ""
def request_isni_data(search_index: str, search_term: str, max_records: int = 5) -> str:
"""Request data from the ISNI API""" """Request data from the ISNI API"""
search_string = f'{search_index}="{search_term}"' search_string = f'{search_index}="{search_term}"'
query_params = { query_params: dict[str, Union[str, int]] = {
"query": search_string, "query": search_string,
"version": "1.1", "version": "1.1",
"operation": "searchRetrieve", "operation": "searchRetrieve",
@ -26,41 +35,52 @@ def request_isni_data(search_index, search_term, max_records=5):
return result.text return result.text
def make_name_string(element): def make_name_string(element: ET.Element) -> str:
"""create a string of form 'personal_name surname'""" """create a string of form 'personal_name surname'"""
# NOTE: this will often be incorrect, many naming systems # NOTE: this will often be incorrect, many naming systems
# list "surname" before personal name # list "surname" before personal name
forename = element.find(".//forename") forename = element.find(".//forename")
surname = element.find(".//surname") surname = element.find(".//surname")
if forename is not None:
return "".join([forename.text, " ", surname.text]) forename_text = get_element_text(forename)
return surname.text surname_text = get_element_text(surname)
return "".join(
[forename_text, " " if forename_text and surname_text else "", surname_text]
)
def get_other_identifier(element, code): def get_other_identifier(element: ET.Element, code: str) -> str:
"""Get other identifiers associated with an author from their ISNI record""" """Get other identifiers associated with an author from their ISNI record"""
identifiers = element.findall(".//otherIdentifierOfIdentity") identifiers = element.findall(".//otherIdentifierOfIdentity")
for section_head in identifiers: for section_head in identifiers:
if ( if (
section_head.find(".//type") is not None (section_type := section_head.find(".//type")) is not None
and section_head.find(".//type").text == code and section_type.text is not None
and section_head.find(".//identifier") is not None and section_type.text == code
and (identifier := section_head.find(".//identifier")) is not None
and identifier.text is not None
): ):
return section_head.find(".//identifier").text return identifier.text
# if we can't find it in otherIdentifierOfIdentity, # if we can't find it in otherIdentifierOfIdentity,
# try sources # try sources
for source in element.findall(".//sources"): for source in element.findall(".//sources"):
code_of_source = source.find(".//codeOfSource") if (
if code_of_source is not None and code_of_source.text.lower() == code.lower(): (code_of_source := source.find(".//codeOfSource")) is not None
return source.find(".//sourceIdentifier").text and code_of_source.text is not None
and code_of_source.text.lower() == code.lower()
and (source_identifier := source.find(".//sourceIdentifier")) is not None
and source_identifier.text is not None
):
return source_identifier.text
return "" return ""
def get_external_information_uri(element, match_string): def get_external_information_uri(element: ET.Element, match_string: str) -> str:
"""Get URLs associated with an author from their ISNI record""" """Get URLs associated with an author from their ISNI record"""
sources = element.findall(".//externalInformation") sources = element.findall(".//externalInformation")
@ -69,14 +89,18 @@ def get_external_information_uri(element, match_string):
uri = source.find(".//URI") uri = source.find(".//URI")
if ( if (
uri is not None uri is not None
and uri.text is not None
and information is not None and information is not None
and information.text is not None
and information.text.lower() == match_string.lower() and information.text.lower() == match_string.lower()
): ):
return uri.text return uri.text
return "" return ""
def find_authors_by_name(name_string, description=False): def find_authors_by_name(
name_string: str, description: bool = False
) -> list[activitypub.Author]:
"""Query the ISNI database for possible author matches by name""" """Query the ISNI database for possible author matches by name"""
payload = request_isni_data("pica.na", name_string) payload = request_isni_data("pica.na", name_string)
@ -92,7 +116,11 @@ def find_authors_by_name(name_string, description=False):
if not personal_name: if not personal_name:
continue continue
author = get_author_from_isni(element.find(".//isniUnformatted").text) author = get_author_from_isni(
get_element_text(element.find(".//isniUnformatted"))
)
if author is None:
continue
if bool(description): if bool(description):
@ -111,22 +139,23 @@ def find_authors_by_name(name_string, description=False):
# some of the "titles" in ISNI are a little ...iffy # some of the "titles" in ISNI are a little ...iffy
# @ is used by ISNI/OCLC to index the starting point ignoring stop words # @ is used by ISNI/OCLC to index the starting point ignoring stop words
# (e.g. "The @Government of no one") # (e.g. "The @Government of no one")
title_elements = [ author.bio = ""
e for title in titles:
for e in titles if (
if hasattr(e, "text") and not e.text.replace("@", "").isnumeric() title is not None
] and hasattr(title, "text")
if len(title_elements): and title.text is not None
author.bio = title_elements[0].text.replace("@", "") and not title.text.replace("@", "").isnumeric()
else: ):
author.bio = None author.bio = title.text.replace("@", "")
break
possible_authors.append(author) possible_authors.append(author)
return possible_authors return possible_authors
def get_author_from_isni(isni): def get_author_from_isni(isni: str) -> Optional[activitypub.Author]:
"""Find data to populate a new author record from their ISNI""" """Find data to populate a new author record from their ISNI"""
payload = request_isni_data("pica.isn", isni) payload = request_isni_data("pica.isn", isni)
@ -135,25 +164,32 @@ def get_author_from_isni(isni):
# there should only be a single responseRecord # there should only be a single responseRecord
# but let's use the first one just in case # but let's use the first one just in case
element = root.find(".//responseRecord") element = root.find(".//responseRecord")
name = make_name_string(element.find(".//forename/..")) if element is None:
# TODO What if there is no responseRecord?
# return empty Author or None, or raise Exception?
return None
name = (
make_name_string(forename)
if (forename := element.find(".//forename/..")) is not None
else ""
)
viaf = get_other_identifier(element, "viaf") viaf = get_other_identifier(element, "viaf")
# use a set to dedupe aliases in ISNI # use a set to dedupe aliases in ISNI
aliases = set() aliases = set()
aliases_element = element.findall(".//personalNameVariant") aliases_element = element.findall(".//personalNameVariant")
for entry in aliases_element: for entry in aliases_element:
aliases.add(make_name_string(entry)) aliases.add(make_name_string(entry))
# aliases needs to be list not set bio = get_element_text(element.find(".//nameTitle"))
aliases = list(aliases)
bio = element.find(".//nameTitle")
bio = bio.text if bio is not None else ""
wikipedia = get_external_information_uri(element, "Wikipedia") wikipedia = get_external_information_uri(element, "Wikipedia")
author = activitypub.Author( author = activitypub.Author(
id=element.find(".//isniURI").text, id=get_element_text(element.find(".//isniURI")),
name=name, name=name,
isni=isni, isni=isni,
viafId=viaf, viafId=viaf,
aliases=aliases, # aliases needs to be list not set
aliases=list(aliases),
bio=bio, bio=bio,
wikipediaLink=wikipedia, wikipediaLink=wikipedia,
) )
@ -161,21 +197,27 @@ def get_author_from_isni(isni):
return author return author
def build_author_from_isni(match_value): def build_author_from_isni(match_value: str) -> dict[str, activitypub.Author]:
"""Build basic author class object from ISNI URL""" """Build basic author class object from ISNI URL"""
# if it is an isni value get the data # if it is an isni value get the data
if match_value.startswith("https://isni.org/isni/"): if match_value.startswith("https://isni.org/isni/"):
isni = match_value.replace("https://isni.org/isni/", "") isni = match_value.replace("https://isni.org/isni/", "")
return {"author": get_author_from_isni(isni)} author = get_author_from_isni(isni)
if author is not None:
return {"author": author}
# otherwise it's a name string # otherwise it's a name string
return {} return {}
def augment_author_metadata(author, isni): def augment_author_metadata(author: models.Author, isni: str) -> None:
"""Update any missing author fields from ISNI data""" """Update any missing author fields from ISNI data"""
isni_author = get_author_from_isni(isni) isni_author = get_author_from_isni(isni)
if isni_author is None:
# TODO Should we just return or raise an exception to indicate a problem?
return
isni_author.to_model(model=models.Author, instance=author, overwrite=False) isni_author.to_model(model=models.Author, instance=author, overwrite=False)
# we DO want to overwrite aliases because we're adding them to the # we DO want to overwrite aliases because we're adding them to the

View file

@ -10,7 +10,7 @@ class IgnoreVariableDoesNotExist(logging.Filter):
these errors are not useful to us. these errors are not useful to us.
""" """
def filter(self, record): def filter(self, record: logging.LogRecord) -> bool:
if record.exc_info: if record.exc_info:
(_, err_value, _) = record.exc_info (_, err_value, _) = record.exc_info
while err_value: while err_value:

View file

@ -2,7 +2,7 @@
import bleach import bleach
def clean(input_text): def clean(input_text: str) -> str:
"""Run through "bleach" """ """Run through "bleach" """
return bleach.clean( return bleach.clean(
input_text, input_text,

View file

@ -1,8 +1,10 @@
"""Validations""" """Validations"""
from typing import Optional
from bookwyrm.settings import DOMAIN, USE_HTTPS from bookwyrm.settings import DOMAIN, USE_HTTPS
def validate_url_domain(url): def validate_url_domain(url: str) -> Optional[str]:
"""Basic check that the URL starts with the instance domain name""" """Basic check that the URL starts with the instance domain name"""
if not url: if not url:
return None return None

View file

@ -10,6 +10,10 @@ django_settings_module = "bookwyrm.settings"
ignore_errors = True ignore_errors = True
implicit_reexport = True implicit_reexport = True
[mypy-bookwyrm.utils.*]
ignore_errors = False
disallow_untyped_calls = False
[mypy-celerywyrm.*] [mypy-celerywyrm.*]
ignore_errors = False ignore_errors = False