takahe/api/views/oauth.py

123 lines
4.5 KiB
Python
Raw Normal View History

2022-12-11 04:03:14 +00:00
import secrets
2022-12-19 22:06:40 +00:00
from urllib.parse import urlparse, urlunparse
2022-12-11 04:03:14 +00:00
from django.contrib.auth.mixins import LoginRequiredMixin
from django.http import HttpResponseRedirect, JsonResponse
from django.shortcuts import render
2022-12-11 04:03:14 +00:00
from django.utils.decorators import method_decorator
from django.views.decorators.csrf import csrf_exempt
from django.views.generic import TemplateView, View
from api.models import Application, Token
2022-12-14 03:03:06 +00:00
from api.parser import FormOrJsonParser
2022-12-11 04:03:14 +00:00
class OauthRedirect(HttpResponseRedirect):
def __init__(self, redirect_uri, key, value):
2022-12-19 22:06:40 +00:00
url_parts = urlparse(redirect_uri)
self.allowed_schemes = [url_parts.scheme]
# Either add or join the query section
url_parts = list(url_parts)
if url_parts[4]:
url_parts[4] = url_parts[4] + f"&{key}={value}"
else:
url_parts[4] = f"{key}={value}"
super().__init__(urlunparse(url_parts))
2022-12-11 04:03:14 +00:00
class AuthorizationView(LoginRequiredMixin, TemplateView):
"""
Asks the user to authorize access.
Could maybe be a FormView, but things are weird enough we just handle the
POST manually.
"""
template_name = "api/oauth_authorize.html"
def get_context_data(self):
redirect_uri = self.request.GET["redirect_uri"]
scope = self.request.GET.get("scope", "read")
try:
application = Application.objects.get(
client_id=self.request.GET["client_id"]
)
except (Application.DoesNotExist, KeyError):
return OauthRedirect(redirect_uri, "error", "invalid_application")
return {
"application": application,
"redirect_uri": redirect_uri,
"scope": scope,
"identities": self.request.user.identities.all(),
}
def post(self, request):
2022-12-14 03:03:06 +00:00
post_data = FormOrJsonParser().parse_body(request)
2022-12-11 04:03:14 +00:00
# Grab the application and other details again
2022-12-14 03:03:06 +00:00
redirect_uri = post_data["redirect_uri"]
scope = post_data["scope"]
application = Application.objects.get(client_id=post_data["client_id"])
2022-12-11 04:03:14 +00:00
# Get the identity
2022-12-14 03:03:06 +00:00
identity = self.request.user.identities.get(pk=post_data["identity"])
2022-12-11 04:03:14 +00:00
# Make a token
token = Token.objects.create(
application=application,
user=self.request.user,
identity=identity,
token=secrets.token_urlsafe(32),
code=secrets.token_urlsafe(16),
scopes=scope.split(),
)
# If it's an out of band request, show the code
if redirect_uri == "urn:ietf:wg:oauth:2.0:oob":
return render(request, "api/oauth_code.html", {"code": token.code})
2022-12-11 04:03:14 +00:00
# Redirect with the token's code
return OauthRedirect(redirect_uri, "code", token.code)
@method_decorator(csrf_exempt, name="dispatch")
class TokenView(View):
def post(self, request):
2022-12-14 03:03:06 +00:00
post_data = FormOrJsonParser().parse_body(request)
grant_type = post_data.get("grant_type")
if grant_type not in (
"authorization_code",
"client_credentials",
):
return JsonResponse({"error": "invalid_grant_type"}, status=400)
2022-12-14 03:03:06 +00:00
2022-12-11 04:03:14 +00:00
try:
2022-12-14 03:03:06 +00:00
application = Application.objects.get(client_id=post_data["client_id"])
2022-12-11 04:03:14 +00:00
except (Application.DoesNotExist, KeyError):
return JsonResponse({"error": "invalid_client_id"}, status=400)
# TODO: Implement client credentials flow
if grant_type == "client_credentials":
return JsonResponse({"error": "invalid_grant_type"}, status=400)
elif grant_type == "authorization_code":
code = post_data.get("code")
if not code:
return JsonResponse({"error": "invalid_code"}, status=400)
2022-12-11 04:03:14 +00:00
# Retrieve the token by code
# TODO: Check code expiry based on created date
try:
token = Token.objects.get(code=code, application=application)
except Token.DoesNotExist:
return JsonResponse({"error": "invalid_code"}, status=400)
# Update the token to remove its code
token.code = None
token.save()
# Return them the token
return JsonResponse(
{
"access_token": token.token,
"token_type": "Bearer",
"scope": " ".join(token.scopes),
"created_at": int(token.created.timestamp()),
}
)
class RevokeTokenView(View):
pass