moviewyrm/bookwyrm/models/base_model.py
Vivianne Langdon ebf463fc91 Generation of slugs and new urls to handle slugs
- TODO: redirect to correct slug if not found.
2022-03-02 00:21:23 -08:00

202 lines
6.7 KiB
Python

""" base model with default fields """
import base64
from Crypto import Random
from django.core.exceptions import PermissionDenied
from django.db import models
from django.db.models import Q
from django.dispatch import receiver
from django.http import Http404
from django.utils.translation import gettext_lazy as _
from django.utils.text import slugify
from bookwyrm.settings import DOMAIN
from .fields import RemoteIdField
DeactivationReason = [
("pending", _("Pending")),
("self_deletion", _("Self deletion")),
("moderator_suspension", _("Moderator suspension")),
("moderator_deletion", _("Moderator deletion")),
("domain_block", _("Domain block")),
]
def new_access_code():
"""the identifier for a user invite"""
return base64.b32encode(Random.get_random_bytes(5)).decode("ascii")
class BookWyrmModel(models.Model):
"""shared fields"""
created_date = models.DateTimeField(auto_now_add=True)
updated_date = models.DateTimeField(auto_now=True)
remote_id = RemoteIdField(null=True, activitypub_field="id")
def get_permalink(self):
"""generate the url that resolves to the local object, without a slug"""
base_path = f"https://{DOMAIN}"
if hasattr(self, "user"):
base_path = f"{base_path}{self.user.local_path}"
model_name = type(self).__name__.lower()
return f"{base_path}/{model_name}/{self.id}"
def get_remote_id(self):
"""generate a url that resolves to the local object, with a slug suffix"""
path = self.get_permalink()
name = None
if hasattr(self, "name_field"):
name = getattr(self, self.name_field)
elif hasattr(self, "name"):
name = self.name
if name:
slug = slugify(name)
path = f"{path}/s/{slug}"
return path
class Meta:
"""this is just here to provide default fields for other models"""
abstract = True
@property
def local_path(self):
"""how to link to this object in the local app"""
return self.get_remote_id().replace(f"https://{DOMAIN}", "")
def raise_visible_to_user(self, viewer):
"""is a user authorized to view an object?"""
# make sure this is an object with privacy owned by a user
if not hasattr(self, "user") or not hasattr(self, "privacy"):
return
# viewer can't see it if the object's owner blocked them
if viewer in self.user.blocks.all():
raise Http404()
# you can see your own posts and any public or unlisted posts
if viewer == self.user or self.privacy in ["public", "unlisted"]:
return
# you can see the followers only posts of people you follow
if self.privacy == "followers" and (
self.user.followers.filter(id=viewer.id).first()
):
return
# you can see dms you are tagged in
if hasattr(self, "mention_users"):
if (
self.privacy in ["direct", "followers"]
and self.mention_users.filter(id=viewer.id).first()
):
return
# you can see groups of which you are a member
if (
hasattr(self, "memberships")
and viewer.is_authenticated
and self.memberships.filter(user=viewer).exists()
):
return
# you can see objects which have a group of which you are a member
if hasattr(self, "group"):
if (
hasattr(self.group, "memberships")
and self.group.memberships.filter(user=viewer).exists()
):
return
raise Http404()
def raise_not_editable(self, viewer):
"""does this user have permission to edit this object? liable to be overwritten
by models that inherit this base model class"""
if not hasattr(self, "user"):
return
# generally moderators shouldn't be able to edit other people's stuff
if self.user == viewer:
return
raise PermissionDenied()
def raise_not_deletable(self, viewer):
"""does this user have permission to delete this object? liable to be
overwritten by models that inherit this base model class"""
if not hasattr(self, "user"):
return
# but generally moderators can delete other people's stuff
if self.user == viewer or viewer.has_perm("moderate_post"):
return
raise PermissionDenied()
@classmethod
def privacy_filter(cls, viewer, privacy_levels=None):
"""filter objects that have "user" and "privacy" fields"""
queryset = cls.objects
if hasattr(queryset, "select_subclasses"):
queryset = queryset.select_subclasses()
privacy_levels = privacy_levels or ["public", "unlisted", "followers", "direct"]
# you can't see followers only or direct messages if you're not logged in
if viewer.is_anonymous:
privacy_levels = [
p for p in privacy_levels if not p in ["followers", "direct"]
]
else:
# exclude blocks from both directions
queryset = queryset.exclude(
Q(user__blocked_by=viewer) | Q(user__blocks=viewer)
)
# filter to only provided privacy levels
queryset = queryset.filter(privacy__in=privacy_levels)
if "followers" in privacy_levels:
queryset = cls.followers_filter(queryset, viewer)
# exclude direct messages not intended for the user
if "direct" in privacy_levels:
queryset = cls.direct_filter(queryset, viewer)
return queryset
@classmethod
def followers_filter(cls, queryset, viewer):
"""Override-able filter for "followers" privacy level"""
return queryset.exclude(
~Q( # user isn't following and it isn't their own status
Q(user__followers=viewer) | Q(user=viewer)
),
privacy="followers", # and the status is followers only
)
@classmethod
def direct_filter(cls, queryset, viewer):
"""Override-able filter for "direct" privacy level"""
return queryset.exclude(~Q(user=viewer), privacy="direct")
@receiver(models.signals.post_save)
# pylint: disable=unused-argument
def set_remote_id(sender, instance, created, *args, **kwargs):
"""set the remote_id after save (when the id is available)"""
if not created or not hasattr(instance, "get_remote_id"):
return
if not instance.remote_id:
instance.remote_id = instance.get_remote_id()
try:
instance.save(broadcast=False)
except TypeError:
instance.save()