mirror of
https://github.com/jointakahe/takahe.git
synced 2025-01-07 04:35:25 +00:00
349 lines
11 KiB
Python
349 lines
11 KiB
Python
import mimetypes
|
|
from functools import partial
|
|
from typing import ClassVar
|
|
|
|
import httpx
|
|
import urlman
|
|
from cachetools import TTLCache, cached
|
|
from django.conf import settings
|
|
from django.core.exceptions import ValidationError
|
|
from django.core.files.base import ContentFile
|
|
from django.db import models
|
|
from django.utils.safestring import mark_safe
|
|
from PIL import Image
|
|
|
|
from core.files import get_remote_file
|
|
from core.html import FediverseHtmlParser
|
|
from core.ld import format_ld_date
|
|
from core.models import Config
|
|
from core.uploads import upload_emoji_namer
|
|
from core.uris import (
|
|
AutoAbsoluteUrl,
|
|
ProxyAbsoluteUrl,
|
|
RelativeAbsoluteUrl,
|
|
StaticAbsoluteUrl,
|
|
)
|
|
from stator.models import State, StateField, StateGraph, StatorModel
|
|
from users.models import Domain
|
|
|
|
|
|
class EmojiStates(StateGraph):
|
|
outdated = State(try_interval=300, force_initial=True)
|
|
updated = State()
|
|
|
|
outdated.transitions_to(updated)
|
|
|
|
@classmethod
|
|
def handle_outdated(cls, instance: "Emoji"):
|
|
"""
|
|
Fetches remote emoji and uploads to file for local caching
|
|
"""
|
|
if instance.remote_url and not instance.file:
|
|
try:
|
|
file, mimetype = get_remote_file(
|
|
instance.remote_url,
|
|
timeout=settings.SETUP.REMOTE_TIMEOUT,
|
|
max_size=settings.SETUP.EMOJI_MAX_IMAGE_FILESIZE_KB * 1024,
|
|
)
|
|
except httpx.RequestError:
|
|
return
|
|
|
|
if file:
|
|
if mimetype == "application/octet-stream":
|
|
mimetype = Image.open(file).get_format_mimetype()
|
|
|
|
instance.file = file
|
|
instance.mimetype = mimetype
|
|
instance.save()
|
|
|
|
return cls.updated
|
|
|
|
|
|
class EmojiQuerySet(models.QuerySet):
|
|
def usable(self, domain: Domain | None = None):
|
|
"""
|
|
Returns all usable emoji, optionally filtering by domain too.
|
|
"""
|
|
visible_q = models.Q(local=True) | models.Q(public=True)
|
|
if Config.system.emoji_unreviewed_are_public:
|
|
visible_q |= models.Q(public__isnull=True)
|
|
qs = self.filter(visible_q)
|
|
|
|
if domain:
|
|
if not domain.local:
|
|
qs = qs.filter(domain=domain)
|
|
|
|
return qs
|
|
|
|
|
|
class EmojiManager(models.Manager):
|
|
def get_queryset(self):
|
|
return EmojiQuerySet(self.model, using=self._db)
|
|
|
|
def usable(self, domain: Domain | None = None):
|
|
return self.get_queryset().usable(domain)
|
|
|
|
|
|
class Emoji(StatorModel):
|
|
# Normalized Emoji without the ':'
|
|
shortcode = models.SlugField(max_length=100, db_index=True)
|
|
|
|
domain = models.ForeignKey(
|
|
"users.Domain", null=True, blank=True, on_delete=models.CASCADE
|
|
)
|
|
local = models.BooleanField(default=True)
|
|
|
|
# Should this be shown in the public UI?
|
|
public = models.BooleanField(null=True)
|
|
|
|
object_uri = models.CharField(max_length=500, blank=True, null=True, unique=True)
|
|
|
|
mimetype = models.CharField(max_length=200)
|
|
|
|
# Files may not be populated if it's remote and not cached on our side yet
|
|
file = models.ImageField(
|
|
upload_to=partial(upload_emoji_namer, "emoji"),
|
|
null=True,
|
|
blank=True,
|
|
)
|
|
|
|
# A link to the custom emoji
|
|
remote_url = models.CharField(max_length=500, blank=True, null=True)
|
|
|
|
# Used for sorting custom emoji in the picker
|
|
category = models.CharField(max_length=100, blank=True, null=True)
|
|
|
|
# State of this Emoji
|
|
state = StateField(EmojiStates)
|
|
|
|
created = models.DateTimeField(auto_now_add=True)
|
|
updated = models.DateTimeField(auto_now=True)
|
|
|
|
objects = EmojiManager()
|
|
|
|
# Cache of the local emojis {shortcode: Emoji}
|
|
locals: ClassVar["dict[str, Emoji]"]
|
|
|
|
class Meta:
|
|
unique_together = ("domain", "shortcode")
|
|
indexes: list = [] # We need this so Stator can add its own
|
|
|
|
class urls(urlman.Urls):
|
|
admin = "/admin/emoji/"
|
|
admin_create = "{admin}create/"
|
|
admin_edit = "{admin}{self.pk}/"
|
|
admin_delete = "{admin}{self.pk}/delete/"
|
|
admin_enable = "{admin}{self.pk}/enable/"
|
|
admin_disable = "{admin}{self.pk}/disable/"
|
|
admin_copy = "{admin}{self.pk}/copy/"
|
|
|
|
def delete(self, using=None, keep_parents=False):
|
|
if self.file:
|
|
self.file.delete()
|
|
return super().delete(using=using, keep_parents=keep_parents)
|
|
|
|
def clean(self):
|
|
super().clean()
|
|
if self.local ^ (self.domain is None):
|
|
raise ValidationError("Must be local or have a domain")
|
|
|
|
def __str__(self):
|
|
return f"{self.id}-{self.shortcode}"
|
|
|
|
@classmethod
|
|
def load_locals(cls) -> dict[str, "Emoji"]:
|
|
return {x.shortcode: x for x in Emoji.objects.usable().filter(local=True)}
|
|
|
|
@classmethod
|
|
@cached(cache=TTLCache(maxsize=1000, ttl=60))
|
|
def get_by_domain(cls, shortcode, domain: Domain | None) -> "Emoji | None":
|
|
"""
|
|
Given an emoji shortcode and optional domain, looks up the single
|
|
emoji and returns it. Raises Emoji.DoesNotExist if there isn't one.
|
|
"""
|
|
try:
|
|
if domain is None or domain.local:
|
|
return cls.objects.get(local=True, shortcode=shortcode)
|
|
else:
|
|
return cls.objects.get(domain=domain, shortcode=shortcode)
|
|
except Emoji.DoesNotExist:
|
|
return None
|
|
|
|
@property
|
|
def fullcode(self):
|
|
return f":{self.shortcode}:"
|
|
|
|
@property
|
|
def is_usable(self) -> bool:
|
|
"""
|
|
Return True if this Emoji is usable.
|
|
"""
|
|
return self.public or (
|
|
self.public is None and Config.system.emoji_unreviewed_are_public
|
|
)
|
|
|
|
def full_url_admin(self) -> RelativeAbsoluteUrl:
|
|
return self.full_url(always_show=True)
|
|
|
|
def full_url(self, always_show=False) -> RelativeAbsoluteUrl:
|
|
if self.is_usable or always_show:
|
|
if self.file:
|
|
return AutoAbsoluteUrl(self.file.url)
|
|
elif self.remote_url:
|
|
return ProxyAbsoluteUrl(
|
|
f"/proxy/emoji/{self.pk}/",
|
|
remote_url=self.remote_url,
|
|
)
|
|
return StaticAbsoluteUrl("img/blank-emoji-128.png")
|
|
|
|
def as_html(self):
|
|
if self.is_usable:
|
|
return mark_safe(
|
|
f'<img src="{self.full_url().relative}" class="emoji" alt="Emoji {self.shortcode}">'
|
|
)
|
|
return self.fullcode
|
|
|
|
@property
|
|
def can_copy_local(self):
|
|
if not hasattr(Emoji, "locals"):
|
|
Emoji.locals = Emoji.load_locals()
|
|
return not self.local and self.is_usable and self.shortcode not in Emoji.locals
|
|
|
|
def copy_to_local(self, *, save: bool = True):
|
|
"""
|
|
Copy this (non-local) Emoji to local for use by Users of this instance. Returns
|
|
the Emoji instance, or None if the copy failed to happen. Specify save=False to
|
|
return the object without saving to database (for bulk saving).
|
|
"""
|
|
if not self.can_copy_local:
|
|
return None
|
|
|
|
emoji = None
|
|
if self.file:
|
|
# new emoji gets its own copy of the file
|
|
file = ContentFile(self.file.read())
|
|
file.name = self.file.name
|
|
emoji = Emoji(
|
|
shortcode=self.shortcode,
|
|
domain=None,
|
|
local=True,
|
|
mimetype=self.mimetype,
|
|
file=file,
|
|
category=self.category,
|
|
)
|
|
if save:
|
|
emoji.save()
|
|
# add this new one to the locals cache
|
|
Emoji.locals[self.shortcode] = emoji
|
|
return emoji
|
|
|
|
@classmethod
|
|
def emojis_from_content(cls, content: str, domain: Domain | None) -> list["Emoji"]:
|
|
"""
|
|
Return a parsed and sanitized of emoji found in content without
|
|
the surrounding ':'.
|
|
"""
|
|
emoji_hits = FediverseHtmlParser(
|
|
content, find_emojis=True, emoji_domain=domain
|
|
).emojis
|
|
emojis = sorted({emoji for emoji in emoji_hits})
|
|
return list(
|
|
cls.objects.filter(local=(domain is None) or domain.local)
|
|
.usable(domain)
|
|
.filter(shortcode__in=emojis)
|
|
)
|
|
|
|
def to_ap_tag(self):
|
|
"""
|
|
Return this Emoji as an ActivityPub Tag
|
|
"""
|
|
return {
|
|
"id": self.object_uri or f"https://{settings.MAIN_DOMAIN}/emoji/{self.pk}/",
|
|
"type": "Emoji",
|
|
"name": f":{self.shortcode}:",
|
|
"icon": {
|
|
"type": "Image",
|
|
"mediaType": self.mimetype,
|
|
"url": self.full_url().absolute,
|
|
},
|
|
"updated": format_ld_date(self.updated),
|
|
}
|
|
|
|
@classmethod
|
|
def by_ap_tag(cls, domain: Domain, data: dict, create: bool = False):
|
|
""" """
|
|
try:
|
|
return cls.objects.get(object_uri=data["id"])
|
|
except cls.DoesNotExist:
|
|
if not create:
|
|
raise KeyError(f"No emoji with ID {data['id']}", data)
|
|
|
|
# Name could be a direct property, or in a language'd value
|
|
if "name" in data:
|
|
name = data["name"]
|
|
elif "nameMap" in data and "und" in data["nameMap"]:
|
|
name = data["nameMap"]["und"]
|
|
else:
|
|
raise ValueError("No name on emoji JSON")
|
|
|
|
icon = data["icon"]
|
|
|
|
mimetype = icon.get("mediaType")
|
|
if not mimetype:
|
|
mimetype, _ = mimetypes.guess_type(icon["url"])
|
|
|
|
# create
|
|
shortcode = name.strip(":")
|
|
category = (icon.get("category") or "")[:100]
|
|
|
|
if not domain.local:
|
|
try:
|
|
emoji = cls.objects.get(shortcode=shortcode, domain=domain)
|
|
except cls.DoesNotExist:
|
|
pass
|
|
else:
|
|
# default to previously discovered mimetype if not provided
|
|
# by the instance to avoid infinite outdated state
|
|
if mimetype is None:
|
|
mimetype = emoji.mimetype
|
|
|
|
# Domain previously provided this shortcode. Trample in the new emoji
|
|
if emoji.remote_url != icon["url"] or emoji.mimetype != mimetype:
|
|
emoji.object_uri = data["id"]
|
|
emoji.remote_url = icon["url"]
|
|
emoji.mimetype = mimetype
|
|
emoji.category = category
|
|
if emoji.file:
|
|
emoji.file.delete(save=True)
|
|
else:
|
|
emoji.save()
|
|
emoji.transition_perform("outdated")
|
|
return emoji
|
|
|
|
emoji = cls.objects.create(
|
|
shortcode=shortcode,
|
|
domain=None if domain.local else domain,
|
|
local=domain.local,
|
|
object_uri=data["id"],
|
|
mimetype=mimetype or "application/octet-stream",
|
|
category=category,
|
|
remote_url=icon["url"],
|
|
)
|
|
return emoji
|
|
|
|
### Mastodon API ###
|
|
|
|
def to_mastodon_json(self):
|
|
url = self.full_url().absolute
|
|
data = {
|
|
"shortcode": self.shortcode,
|
|
"url": url,
|
|
"static_url": self.remote_url or url,
|
|
"visible_in_picker": (
|
|
Config.system.emoji_unreviewed_are_public
|
|
if self.public is None
|
|
else self.public
|
|
),
|
|
"category": self.category or "",
|
|
}
|
|
return data
|