takahe/users/models/domain.py

183 lines
6.2 KiB
Python

import json
from typing import Optional
import httpx
import urlman
from asgiref.sync import sync_to_async
from django.conf import settings
from django.db import models
from stator.models import State, StateField, StateGraph, StatorModel
from users.schemas import NodeInfo
class DomainStates(StateGraph):
outdated = State(try_interval=60 * 30, force_initial=True)
updated = State(try_interval=60 * 60 * 24, attempt_immediately=False)
connection_issue = State(externally_progressed=True)
purged = State()
outdated.transitions_to(updated)
updated.transitions_to(outdated)
outdated.transitions_to(connection_issue)
outdated.transitions_to(purged)
connection_issue.transitions_to(outdated)
connection_issue.transitions_to(purged)
outdated.times_out_to(connection_issue, 60 * 60 * 24)
@classmethod
async def handle_outdated(cls, instance: "Domain"):
info = await instance.fetch_nodeinfo()
if info:
instance.nodeinfo = info.dict()
await sync_to_async(instance.save)()
return cls.updated
@classmethod
async def handle_updated(cls, instance: "Domain"):
return cls.outdated
class Domain(StatorModel):
"""
Represents a domain that a user can have an account on.
For protocol reasons, if we want to allow custom usernames
per domain, each "display" domain (the one in the handle) must either let
us serve on it directly, or have a "service" domain that maps
to it uniquely that we can serve on that.
That way, someone coming in with just an Actor URI as their
entrypoint can still try to webfinger preferredUsername@actorDomain
and we can return an appropriate response.
It's possible to just have one domain do both jobs, of course.
This model also represents _other_ servers' domains, which we treat as
display domains for now, until we start doing better probing.
"""
domain = models.CharField(max_length=250, primary_key=True)
service_domain = models.CharField(
max_length=250,
null=True,
blank=True,
db_index=True,
unique=True,
)
state = StateField(DomainStates)
# nodeinfo 2.0 detail about the remote server
nodeinfo = models.JSONField(null=True, blank=True)
# If we own this domain
local = models.BooleanField()
# If we have blocked this domain from interacting with us
blocked = models.BooleanField(default=False)
# Domains can be joinable by any user of the instance (as the default one
# should)
public = models.BooleanField(default=False)
# If this is the default domain (shown as the default entry for new users)
default = models.BooleanField(default=False)
# Domains can also be linked to one or more users for their private use
# This should be display domains ONLY
users = models.ManyToManyField("users.User", related_name="domains", blank=True)
created = models.DateTimeField(auto_now_add=True)
updated = models.DateTimeField(auto_now=True)
class urls(urlman.Urls):
root = "/admin/domains/"
create = "/admin/domains/create/"
edit = "/admin/domains/{self.domain}/"
delete = "{edit}delete/"
root_federation = "/admin/federation/"
edit_federation = "/admin/federation/{self.domain}/"
@classmethod
def get_remote_domain(cls, domain: str) -> "Domain":
return cls.objects.get_or_create(domain=domain.lower(), local=False)[0]
@classmethod
def get_domain(cls, domain: str) -> Optional["Domain"]:
try:
return cls.objects.get(
models.Q(domain=domain.lower())
| models.Q(service_domain=domain.lower())
)
except cls.DoesNotExist:
return None
@property
def uri_domain(self) -> str:
if self.service_domain:
return self.service_domain
return self.domain
@classmethod
def available_for_user(cls, user):
"""
Returns domains that are available for the user to put an identity on
"""
return cls.objects.filter(
models.Q(public=True) | models.Q(users__id=user.id),
local=True,
).order_by("-default", "domain")
def __str__(self):
return self.domain
def save(self, *args, **kwargs):
# Ensure that we are not conflicting with other domains
if Domain.objects.filter(service_domain=self.domain).exists():
raise ValueError(
f"Domain {self.domain} is already a service domain elsewhere!"
)
if self.service_domain:
if Domain.objects.filter(domain=self.service_domain).exists():
raise ValueError(
f"Service domain {self.service_domain} is already a domain elsewhere!"
)
super().save(*args, **kwargs)
async def fetch_nodeinfo(self) -> NodeInfo | None:
"""
Fetch the /NodeInfo/2.0 for the domain
"""
async with httpx.AsyncClient(
timeout=settings.SETUP.REMOTE_TIMEOUT,
headers={"User-Agent": settings.TAKAHE_USER_AGENT},
) as client:
try:
response = await client.get(
f"https://{self.domain}/nodeinfo/2.0",
follow_redirects=True,
headers={"Accept": "application/json"},
)
response.raise_for_status()
except httpx.HTTPError as ex:
response = getattr(ex, "response", None)
if (
response
and response.status_code < 500
and response.status_code not in [401, 403, 404, 406, 410]
):
raise ValueError(
f"Client error fetching nodeinfo: domain={self.domain}, code={response.status_code}",
response.content,
)
return None
try:
info = NodeInfo(**response.json())
except json.JSONDecodeError as ex:
raise ValueError(
f"Client error decoding nodeinfo: domain={self.domain}, error={str(ex)}"
)
return info