""" activitypub-aware django model fields """ from dataclasses import MISSING import imghdr import re from uuid import uuid4 from urllib.parse import urljoin import dateutil.parser from dateutil.parser import ParserError from django.contrib.postgres.fields import ArrayField as DjangoArrayField from django.core.exceptions import ValidationError from django.core.files.base import ContentFile from django.db import models from django.forms import ClearableFileInput, ImageField as DjangoImageField from django.utils import timezone from django.utils.translation import gettext_lazy as _ from django.utils.encoding import filepath_to_uri from bookwyrm import activitypub from bookwyrm.connectors import get_image from bookwyrm.sanitize_html import InputHtmlParser from bookwyrm.settings import MEDIA_FULL_URL def validate_remote_id(value): """make sure the remote_id looks like a url""" if not value or not re.match(r"^http.?:\/\/[^\s]+$", value): raise ValidationError( _("%(value)s is not a valid remote_id"), params={"value": value}, ) def validate_localname(value): """make sure localnames look okay""" if not re.match(r"^[A-Za-z\-_\.0-9]+$", value): raise ValidationError( _("%(value)s is not a valid username"), params={"value": value}, ) def validate_username(value): """make sure usernames look okay""" if not re.match(r"^[A-Za-z\-_\.0-9]+@[A-Za-z\-_\.0-9]+\.[a-z]{2,}$", value): raise ValidationError( _("%(value)s is not a valid username"), params={"value": value}, ) class ActivitypubFieldMixin: """make a database field serializable""" def __init__( self, *args, activitypub_field=None, activitypub_wrapper=None, deduplication_field=False, **kwargs, ): self.deduplication_field = deduplication_field if activitypub_wrapper: self.activitypub_wrapper = activitypub_field self.activitypub_field = activitypub_wrapper else: self.activitypub_field = activitypub_field super().__init__(*args, **kwargs) def set_field_from_activity(self, instance, data, overwrite=True): """helper function for assinging a value to the field. Returns if changed""" try: value = getattr(data, self.get_activitypub_field()) except AttributeError: # masssively hack-y workaround for boosts if self.get_activitypub_field() != "attributedTo": raise value = getattr(data, "actor") formatted = self.field_from_activity(value) if formatted is None or formatted is MISSING or formatted == {}: return False current_value = ( getattr(instance, self.name) if hasattr(instance, self.name) else None ) # if we're not in overwrite mode, only continue updating the field if its unset if current_value and not overwrite: return False # the field is unchanged if current_value == formatted: return False setattr(instance, self.name, formatted) return True def set_activity_from_field(self, activity, instance): """update the json object""" value = getattr(instance, self.name) formatted = self.field_to_activity(value) if formatted is None: return key = self.get_activitypub_field() # TODO: surely there's a better way if instance.__class__.__name__ == "Boost" and key == "attributedTo": key = "actor" if isinstance(activity.get(key), list): activity[key] += formatted else: activity[key] = formatted def field_to_activity(self, value): """formatter to convert a model value into activitypub""" if hasattr(self, "activitypub_wrapper"): return {self.activitypub_wrapper: value} return value def field_from_activity(self, value): """formatter to convert activitypub into a model value""" if value and hasattr(self, "activitypub_wrapper"): value = value.get(self.activitypub_wrapper) return value def get_activitypub_field(self): """model_field_name to activitypubFieldName""" if self.activitypub_field: return self.activitypub_field name = self.name.split(".")[-1] components = name.split("_") return components[0] + "".join(x.title() for x in components[1:]) class ActivitypubRelatedFieldMixin(ActivitypubFieldMixin): """default (de)serialization for foreign key and one to one""" def __init__(self, *args, load_remote=True, **kwargs): self.load_remote = load_remote super().__init__(*args, **kwargs) def field_from_activity(self, value): if not value: return None related_model = self.related_model if hasattr(value, "id") and value.id: if not self.load_remote: # only look in the local database return related_model.find_existing(value.serialize()) # this is an activitypub object, which we can deserialize return value.to_model(model=related_model) try: # make sure the value looks like a remote id validate_remote_id(value) except ValidationError: # we don't know what this is, ignore it return None # gets or creates the model field from the remote id if not self.load_remote: # only look in the local database return related_model.find_existing_by_remote_id(value) return activitypub.resolve_remote_id(value, model=related_model) class RemoteIdField(ActivitypubFieldMixin, models.CharField): """a url that serves as a unique identifier""" def __init__(self, *args, max_length=255, validators=None, **kwargs): validators = validators or [validate_remote_id] super().__init__(*args, max_length=max_length, validators=validators, **kwargs) # for this field, the default is true. false everywhere else. self.deduplication_field = kwargs.get("deduplication_field", True) class UsernameField(ActivitypubFieldMixin, models.CharField): """activitypub-aware username field""" def __init__(self, activitypub_field="preferredUsername", **kwargs): self.activitypub_field = activitypub_field # I don't totally know why pylint is mad at this, but it makes it work super(ActivitypubFieldMixin, self).__init__( # pylint: disable=bad-super-call _("username"), max_length=150, unique=True, validators=[validate_username], error_messages={ "unique": _("A user with that username already exists."), }, ) def deconstruct(self): """implementation of models.Field deconstruct""" name, path, args, kwargs = super().deconstruct() del kwargs["verbose_name"] del kwargs["max_length"] del kwargs["unique"] del kwargs["validators"] del kwargs["error_messages"] return name, path, args, kwargs def field_to_activity(self, value): return value.split("@")[0] PrivacyLevels = models.TextChoices( "Privacy", ["public", "unlisted", "followers", "direct"] ) class PrivacyField(ActivitypubFieldMixin, models.CharField): """this maps to two differente activitypub fields""" public = "https://www.w3.org/ns/activitystreams#Public" def __init__(self, *args, **kwargs): super().__init__( *args, max_length=255, choices=PrivacyLevels.choices, default="public" ) # pylint: disable=invalid-name def set_field_from_activity(self, instance, data, overwrite=True): if not overwrite: return False original = getattr(instance, self.name) to = data.to cc = data.cc # we need to figure out who this is to get their followers link for field in ["attributedTo", "owner", "actor"]: if hasattr(data, field): user_field = field break if not user_field: raise ValidationError("No user field found for privacy", data) user = activitypub.resolve_remote_id(getattr(data, user_field), model="User") if to == [self.public]: setattr(instance, self.name, "public") elif to == [user.followers_url]: setattr(instance, self.name, "followers") elif cc == []: setattr(instance, self.name, "direct") elif self.public in cc: setattr(instance, self.name, "unlisted") else: setattr(instance, self.name, "followers") return original == getattr(instance, self.name) def set_activity_from_field(self, activity, instance): # explicitly to anyone mentioned (statuses only) mentions = [] if hasattr(instance, "mention_users"): mentions = [u.remote_id for u in instance.mention_users.all()] # this is a link to the followers list # pylint: disable=protected-access followers = instance.user.followers_url if instance.privacy == "public": activity["to"] = [self.public] activity["cc"] = [followers] + mentions elif instance.privacy == "unlisted": activity["to"] = [followers] activity["cc"] = [self.public] + mentions elif instance.privacy == "followers": activity["to"] = [followers] activity["cc"] = mentions if instance.privacy == "direct": activity["to"] = mentions activity["cc"] = [] class ForeignKey(ActivitypubRelatedFieldMixin, models.ForeignKey): """activitypub-aware foreign key field""" def field_to_activity(self, value): if not value: return None return value.remote_id class OneToOneField(ActivitypubRelatedFieldMixin, models.OneToOneField): """activitypub-aware foreign key field""" def field_to_activity(self, value): if not value: return None return value.to_activity() class ManyToManyField(ActivitypubFieldMixin, models.ManyToManyField): """activitypub-aware many to many field""" def __init__(self, *args, link_only=False, **kwargs): self.link_only = link_only super().__init__(*args, **kwargs) def set_field_from_activity(self, instance, data, overwrite=True): """helper function for assigning a value to the field""" if not overwrite and getattr(instance, self.name).exists(): return False value = getattr(data, self.get_activitypub_field()) formatted = self.field_from_activity(value) if formatted is None or formatted is MISSING: return False getattr(instance, self.name).set(formatted) instance.save(broadcast=False) return True def field_to_activity(self, value): if self.link_only: return f"{value.instance.remote_id}/{self.name}" return [i.remote_id for i in value.all()] def field_from_activity(self, value): if value is None or value is MISSING: return None if not isinstance(value, list): # If this is a link, we currently aren't doing anything with it return None items = [] for remote_id in value: try: validate_remote_id(remote_id) except ValidationError: continue items.append( activitypub.resolve_remote_id(remote_id, model=self.related_model) ) return items class TagField(ManyToManyField): """special case of many to many that uses Tags""" def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.activitypub_field = "tag" def field_to_activity(self, value): tags = [] for item in value.all(): activity_type = item.__class__.__name__ if activity_type == "User": activity_type = "Mention" tags.append( activitypub.Link( href=item.remote_id, name=getattr(item, item.name_field), type=activity_type, ) ) return tags def field_from_activity(self, value): if not isinstance(value, list): return None items = [] for link_json in value: link = activitypub.Link(**link_json) tag_type = link.type if link.type != "Mention" else "Person" if tag_type == "Book": tag_type = "Edition" if tag_type != self.related_model.activity_serializer.type: # tags can contain multiple types continue items.append( activitypub.resolve_remote_id(link.href, model=self.related_model) ) return items class ClearableFileInputWithWarning(ClearableFileInput): """max file size warning""" template_name = "widgets/clearable_file_input_with_warning.html" class CustomImageField(DjangoImageField): """overwrites image field for form""" widget = ClearableFileInputWithWarning class ImageField(ActivitypubFieldMixin, models.ImageField): """activitypub-aware image field""" def __init__(self, *args, alt_field=None, **kwargs): self.alt_field = alt_field super().__init__(*args, **kwargs) # pylint: disable=arguments-differ def set_field_from_activity(self, instance, data, save=True, overwrite=True): """helper function for assinging a value to the field""" value = getattr(data, self.get_activitypub_field()) formatted = self.field_from_activity(value) if formatted is None or formatted is MISSING: return False if ( not overwrite and hasattr(instance, self.name) and getattr(instance, self.name) ): return False getattr(instance, self.name).save(*formatted, save=save) return True def set_activity_from_field(self, activity, instance): value = getattr(instance, self.name) if value is None: return alt_text = getattr(instance, self.alt_field) formatted = self.field_to_activity(value, alt_text) key = self.get_activitypub_field() activity[key] = formatted def field_to_activity(self, value, alt=None): url = get_absolute_url(value) if not url: return None return activitypub.Document(url=url, name=alt) def field_from_activity(self, value): image_slug = value # when it's an inline image (User avatar/icon, Book cover), it's a json # blob, but when it's an attached image, it's just a url if hasattr(image_slug, "url"): url = image_slug.url elif isinstance(image_slug, str): url = image_slug else: return None try: validate_remote_id(url) except ValidationError: return None response = get_image(url) if not response: return None image_content = ContentFile(response.content) extension = imghdr.what(None, image_content.read()) or "" image_name = f"{uuid4()}.{extension}" return [image_name, image_content] def formfield(self, **kwargs): """special case for forms""" return super().formfield( **{ "form_class": CustomImageField, **kwargs, } ) def get_absolute_url(value): """returns an absolute URL for the image""" name = getattr(value, "name") if not name: return None url = filepath_to_uri(name) if url is not None: url = url.lstrip("/") url = urljoin(MEDIA_FULL_URL, url) return url class DateTimeField(ActivitypubFieldMixin, models.DateTimeField): """activitypub-aware datetime field""" def field_to_activity(self, value): if not value: return None return value.isoformat() def field_from_activity(self, value): try: date_value = dateutil.parser.parse(value) try: return timezone.make_aware(date_value) except ValueError: return date_value except (ParserError, TypeError): return None class HtmlField(ActivitypubFieldMixin, models.TextField): """a text field for storing html""" def field_from_activity(self, value): if not value or value == MISSING: return None sanitizer = InputHtmlParser() sanitizer.feed(value) return sanitizer.get_output() class ArrayField(ActivitypubFieldMixin, DjangoArrayField): """activitypub-aware array field""" def field_to_activity(self, value): return [str(i) for i in value] class CharField(ActivitypubFieldMixin, models.CharField): """activitypub-aware char field""" class URLField(ActivitypubFieldMixin, models.URLField): """activitypub-aware url field""" class TextField(ActivitypubFieldMixin, models.TextField): """activitypub-aware text field""" class BooleanField(ActivitypubFieldMixin, models.BooleanField): """activitypub-aware boolean field""" class IntegerField(ActivitypubFieldMixin, models.IntegerField): """activitypub-aware boolean field""" class DecimalField(ActivitypubFieldMixin, models.DecimalField): """activitypub-aware boolean field""" def field_to_activity(self, value): if not value: return None return float(value)