mirror of
https://github.com/bookwyrm-social/bookwyrm.git
synced 2025-01-27 09:28:08 +00:00
Type annotations for utils
This commit is contained in:
parent
0f8da5b738
commit
ff8e4597e5
6 changed files with 98 additions and 41 deletions
|
@ -1,5 +1,7 @@
|
|||
""" database schema for info about authors """
|
||||
import re
|
||||
from typing import Tuple, Any
|
||||
|
||||
from django.contrib.postgres.indexes import GinIndex
|
||||
from django.db import models
|
||||
|
||||
|
@ -38,7 +40,7 @@ class Author(BookDataModel):
|
|||
)
|
||||
bio = fields.HtmlField(null=True, blank=True)
|
||||
|
||||
def save(self, *args, **kwargs):
|
||||
def save(self, *args: Tuple[Any, ...], **kwargs: dict[str, Any]) -> None:
|
||||
"""normalize isni format"""
|
||||
if self.isni:
|
||||
self.isni = re.sub(r"\s", "", self.isni)
|
||||
|
|
|
@ -1,8 +1,15 @@
|
|||
""" Custom handler for caching """
|
||||
from typing import Any, Callable, Tuple, Union
|
||||
|
||||
from django.core.cache import cache
|
||||
|
||||
|
||||
def get_or_set(cache_key, function, *args, timeout=None):
|
||||
def get_or_set(
|
||||
cache_key: str,
|
||||
function: Callable[..., Any],
|
||||
*args: Tuple[Any, ...],
|
||||
timeout: Union[float, None] = None
|
||||
) -> Any:
|
||||
"""Django's built-in get_or_set isn't cutting it"""
|
||||
value = cache.get(cache_key)
|
||||
if value is None:
|
||||
|
|
|
@ -1,15 +1,24 @@
|
|||
"""ISNI author checking utilities"""
|
||||
import xml.etree.ElementTree as ET
|
||||
from typing import Union, Optional
|
||||
|
||||
import requests
|
||||
|
||||
from bookwyrm import activitypub, models
|
||||
|
||||
|
||||
def request_isni_data(search_index, search_term, max_records=5):
|
||||
def get_element_text(element: Optional[ET.Element]) -> str:
|
||||
"""If the element is not None and there is a text attribute return this"""
|
||||
if element is not None and element.text is not None:
|
||||
return element.text
|
||||
return ""
|
||||
|
||||
|
||||
def request_isni_data(search_index: str, search_term: str, max_records: int = 5) -> str:
|
||||
"""Request data from the ISNI API"""
|
||||
|
||||
search_string = f'{search_index}="{search_term}"'
|
||||
query_params = {
|
||||
query_params: dict[str, Union[str, int]] = {
|
||||
"query": search_string,
|
||||
"version": "1.1",
|
||||
"operation": "searchRetrieve",
|
||||
|
@ -26,41 +35,52 @@ def request_isni_data(search_index, search_term, max_records=5):
|
|||
return result.text
|
||||
|
||||
|
||||
def make_name_string(element):
|
||||
def make_name_string(element: ET.Element) -> str:
|
||||
"""create a string of form 'personal_name surname'"""
|
||||
|
||||
# NOTE: this will often be incorrect, many naming systems
|
||||
# list "surname" before personal name
|
||||
forename = element.find(".//forename")
|
||||
surname = element.find(".//surname")
|
||||
if forename is not None:
|
||||
return "".join([forename.text, " ", surname.text])
|
||||
return surname.text
|
||||
|
||||
forename_text = get_element_text(forename)
|
||||
surname_text = get_element_text(surname)
|
||||
|
||||
return "".join(
|
||||
[forename_text, " " if forename_text and surname_text else "", surname_text]
|
||||
)
|
||||
|
||||
|
||||
def get_other_identifier(element, code):
|
||||
def get_other_identifier(element: ET.Element, code: str) -> str:
|
||||
"""Get other identifiers associated with an author from their ISNI record"""
|
||||
|
||||
identifiers = element.findall(".//otherIdentifierOfIdentity")
|
||||
for section_head in identifiers:
|
||||
if (
|
||||
section_head.find(".//type") is not None
|
||||
and section_head.find(".//type").text == code
|
||||
and section_head.find(".//identifier") is not None
|
||||
(section_type := section_head.find(".//type")) is not None
|
||||
and section_type.text is not None
|
||||
and section_type.text == code
|
||||
and (identifier := section_head.find(".//identifier")) is not None
|
||||
and identifier.text is not None
|
||||
):
|
||||
return section_head.find(".//identifier").text
|
||||
return identifier.text
|
||||
|
||||
# if we can't find it in otherIdentifierOfIdentity,
|
||||
# try sources
|
||||
for source in element.findall(".//sources"):
|
||||
code_of_source = source.find(".//codeOfSource")
|
||||
if code_of_source is not None and code_of_source.text.lower() == code.lower():
|
||||
return source.find(".//sourceIdentifier").text
|
||||
if (
|
||||
(code_of_source := source.find(".//codeOfSource")) is not None
|
||||
and code_of_source.text is not None
|
||||
and code_of_source.text.lower() == code.lower()
|
||||
and (source_identifier := source.find(".//sourceIdentifier")) is not None
|
||||
and source_identifier.text is not None
|
||||
):
|
||||
return source_identifier.text
|
||||
|
||||
return ""
|
||||
|
||||
|
||||
def get_external_information_uri(element, match_string):
|
||||
def get_external_information_uri(element: ET.Element, match_string: str) -> str:
|
||||
"""Get URLs associated with an author from their ISNI record"""
|
||||
|
||||
sources = element.findall(".//externalInformation")
|
||||
|
@ -69,14 +89,18 @@ def get_external_information_uri(element, match_string):
|
|||
uri = source.find(".//URI")
|
||||
if (
|
||||
uri is not None
|
||||
and uri.text is not None
|
||||
and information is not None
|
||||
and information.text is not None
|
||||
and information.text.lower() == match_string.lower()
|
||||
):
|
||||
return uri.text
|
||||
return ""
|
||||
|
||||
|
||||
def find_authors_by_name(name_string, description=False):
|
||||
def find_authors_by_name(
|
||||
name_string: str, description: bool = False
|
||||
) -> list[activitypub.Author]:
|
||||
"""Query the ISNI database for possible author matches by name"""
|
||||
|
||||
payload = request_isni_data("pica.na", name_string)
|
||||
|
@ -92,7 +116,11 @@ def find_authors_by_name(name_string, description=False):
|
|||
if not personal_name:
|
||||
continue
|
||||
|
||||
author = get_author_from_isni(element.find(".//isniUnformatted").text)
|
||||
author = get_author_from_isni(
|
||||
get_element_text(element.find(".//isniUnformatted"))
|
||||
)
|
||||
if author is None:
|
||||
continue
|
||||
|
||||
if bool(description):
|
||||
|
||||
|
@ -111,22 +139,23 @@ def find_authors_by_name(name_string, description=False):
|
|||
# some of the "titles" in ISNI are a little ...iffy
|
||||
# @ is used by ISNI/OCLC to index the starting point ignoring stop words
|
||||
# (e.g. "The @Government of no one")
|
||||
title_elements = [
|
||||
e
|
||||
for e in titles
|
||||
if hasattr(e, "text") and not e.text.replace("@", "").isnumeric()
|
||||
]
|
||||
if len(title_elements):
|
||||
author.bio = title_elements[0].text.replace("@", "")
|
||||
else:
|
||||
author.bio = None
|
||||
author.bio = ""
|
||||
for title in titles:
|
||||
if (
|
||||
title is not None
|
||||
and hasattr(title, "text")
|
||||
and title.text is not None
|
||||
and not title.text.replace("@", "").isnumeric()
|
||||
):
|
||||
author.bio = title.text.replace("@", "")
|
||||
break
|
||||
|
||||
possible_authors.append(author)
|
||||
|
||||
return possible_authors
|
||||
|
||||
|
||||
def get_author_from_isni(isni):
|
||||
def get_author_from_isni(isni: str) -> Optional[activitypub.Author]:
|
||||
"""Find data to populate a new author record from their ISNI"""
|
||||
|
||||
payload = request_isni_data("pica.isn", isni)
|
||||
|
@ -135,25 +164,32 @@ def get_author_from_isni(isni):
|
|||
# there should only be a single responseRecord
|
||||
# but let's use the first one just in case
|
||||
element = root.find(".//responseRecord")
|
||||
name = make_name_string(element.find(".//forename/.."))
|
||||
if element is None:
|
||||
# TODO What if there is no responseRecord?
|
||||
# return empty Author or None, or raise Exception?
|
||||
return None
|
||||
|
||||
name = (
|
||||
make_name_string(forename)
|
||||
if (forename := element.find(".//forename/..")) is not None
|
||||
else ""
|
||||
)
|
||||
viaf = get_other_identifier(element, "viaf")
|
||||
# use a set to dedupe aliases in ISNI
|
||||
aliases = set()
|
||||
aliases_element = element.findall(".//personalNameVariant")
|
||||
for entry in aliases_element:
|
||||
aliases.add(make_name_string(entry))
|
||||
# aliases needs to be list not set
|
||||
aliases = list(aliases)
|
||||
bio = element.find(".//nameTitle")
|
||||
bio = bio.text if bio is not None else ""
|
||||
bio = get_element_text(element.find(".//nameTitle"))
|
||||
wikipedia = get_external_information_uri(element, "Wikipedia")
|
||||
|
||||
author = activitypub.Author(
|
||||
id=element.find(".//isniURI").text,
|
||||
id=get_element_text(element.find(".//isniURI")),
|
||||
name=name,
|
||||
isni=isni,
|
||||
viafId=viaf,
|
||||
aliases=aliases,
|
||||
# aliases needs to be list not set
|
||||
aliases=list(aliases),
|
||||
bio=bio,
|
||||
wikipediaLink=wikipedia,
|
||||
)
|
||||
|
@ -161,21 +197,27 @@ def get_author_from_isni(isni):
|
|||
return author
|
||||
|
||||
|
||||
def build_author_from_isni(match_value):
|
||||
def build_author_from_isni(match_value: str) -> dict[str, activitypub.Author]:
|
||||
"""Build basic author class object from ISNI URL"""
|
||||
|
||||
# if it is an isni value get the data
|
||||
if match_value.startswith("https://isni.org/isni/"):
|
||||
isni = match_value.replace("https://isni.org/isni/", "")
|
||||
return {"author": get_author_from_isni(isni)}
|
||||
author = get_author_from_isni(isni)
|
||||
if author is not None:
|
||||
return {"author": author}
|
||||
# otherwise it's a name string
|
||||
return {}
|
||||
|
||||
|
||||
def augment_author_metadata(author, isni):
|
||||
def augment_author_metadata(author: models.Author, isni: str) -> None:
|
||||
"""Update any missing author fields from ISNI data"""
|
||||
|
||||
isni_author = get_author_from_isni(isni)
|
||||
if isni_author is None:
|
||||
# TODO Should we just return or raise an exception to indicate a problem?
|
||||
return
|
||||
|
||||
isni_author.to_model(model=models.Author, instance=author, overwrite=False)
|
||||
|
||||
# we DO want to overwrite aliases because we're adding them to the
|
||||
|
|
|
@ -10,7 +10,7 @@ class IgnoreVariableDoesNotExist(logging.Filter):
|
|||
these errors are not useful to us.
|
||||
"""
|
||||
|
||||
def filter(self, record):
|
||||
def filter(self, record: logging.LogRecord) -> bool:
|
||||
if record.exc_info:
|
||||
(_, err_value, _) = record.exc_info
|
||||
while err_value:
|
||||
|
|
|
@ -1,8 +1,10 @@
|
|||
"""Validations"""
|
||||
from typing import Optional
|
||||
|
||||
from bookwyrm.settings import DOMAIN, USE_HTTPS
|
||||
|
||||
|
||||
def validate_url_domain(url):
|
||||
def validate_url_domain(url: str) -> Optional[str]:
|
||||
"""Basic check that the URL starts with the instance domain name"""
|
||||
if not url:
|
||||
return None
|
||||
|
|
4
mypy.ini
4
mypy.ini
|
@ -13,6 +13,10 @@ implicit_reexport = True
|
|||
[mypy-bookwyrm.connectors.*]
|
||||
ignore_errors = False
|
||||
|
||||
[mypy-bookwyrm.utils.*]
|
||||
ignore_errors = False
|
||||
disallow_untyped_calls = False
|
||||
|
||||
[mypy-celerywyrm.*]
|
||||
ignore_errors = False
|
||||
|
||||
|
|
Loading…
Reference in a new issue