mirror of
https://github.com/jointakahe/takahe.git
synced 2024-11-28 18:21:00 +00:00
c391e7bc41
Well, in one direction anyway
121 lines
3.9 KiB
Python
121 lines
3.9 KiB
Python
import base64
|
|
import json
|
|
from typing import Dict, List, Literal, TypedDict
|
|
from urllib.parse import urlparse
|
|
|
|
import httpx
|
|
from cryptography.hazmat.primitives import hashes
|
|
from django.http import HttpRequest
|
|
from django.utils.http import http_date
|
|
|
|
from users.models import Identity
|
|
|
|
|
|
class HttpSignature:
|
|
"""
|
|
Allows for calculation and verification of HTTP signatures
|
|
"""
|
|
|
|
@classmethod
|
|
def calculate_digest(cls, data, algorithm="sha-256") -> str:
|
|
"""
|
|
Calculates the digest header value for a given HTTP body
|
|
"""
|
|
if algorithm == "sha-256":
|
|
digest = hashes.Hash(hashes.SHA256())
|
|
digest.update(data)
|
|
return "SHA-256=" + base64.b64encode(digest.finalize()).decode("ascii")
|
|
else:
|
|
raise ValueError(f"Unknown digest algorithm {algorithm}")
|
|
|
|
@classmethod
|
|
def headers_from_request(cls, request: HttpRequest, header_names: List[str]) -> str:
|
|
"""
|
|
Creates the to-be-signed header payload from a Django request
|
|
"""
|
|
headers = {}
|
|
for header_name in header_names:
|
|
if header_name == "(request-target)":
|
|
value = f"post {request.path}"
|
|
elif header_name == "content-type":
|
|
value = request.META["CONTENT_TYPE"]
|
|
else:
|
|
value = request.META[f"HTTP_{header_name.upper()}"]
|
|
headers[header_name] = value
|
|
return "\n".join(f"{name.lower()}: {value}" for name, value in headers.items())
|
|
|
|
@classmethod
|
|
def parse_signature(cls, signature: str) -> "SignatureDetails":
|
|
bits = {}
|
|
for item in signature.split(","):
|
|
name, value = item.split("=", 1)
|
|
value = value.strip('"')
|
|
bits[name.lower()] = value
|
|
signature_details: SignatureDetails = {
|
|
"headers": bits["headers"].split(),
|
|
"signature": base64.b64decode(bits["signature"]),
|
|
"algorithm": bits["algorithm"],
|
|
"keyid": bits["keyid"],
|
|
}
|
|
return signature_details
|
|
|
|
@classmethod
|
|
def compile_signature(cls, details: "SignatureDetails") -> str:
|
|
value = f'keyId="{details["keyid"]}",headers="'
|
|
value += " ".join(h.lower() for h in details["headers"])
|
|
value += '",signature="'
|
|
value += base64.b64encode(details["signature"]).decode("ascii")
|
|
value += f'",algorithm="{details["algorithm"]}"'
|
|
return value
|
|
|
|
@classmethod
|
|
async def signed_request(
|
|
self,
|
|
uri: str,
|
|
body: Dict,
|
|
identity: Identity,
|
|
content_type: str = "application/json",
|
|
method: Literal["post"] = "post",
|
|
):
|
|
"""
|
|
Performs an async request to the given path, with a document, signed
|
|
as an identity.
|
|
"""
|
|
uri_parts = urlparse(uri)
|
|
date_string = http_date()
|
|
body_bytes = json.dumps(body).encode("utf8")
|
|
headers = {
|
|
"(request-target)": f"{method} {uri_parts.path}",
|
|
"Host": uri_parts.hostname,
|
|
"Date": date_string,
|
|
"Digest": self.calculate_digest(body_bytes),
|
|
"Content-Type": content_type,
|
|
}
|
|
signed_string = "\n".join(
|
|
f"{name.lower()}: {value}" for name, value in headers.items()
|
|
)
|
|
headers["Signature"] = self.compile_signature(
|
|
{
|
|
"keyid": identity.key_id,
|
|
"headers": list(headers.keys()),
|
|
"signature": identity.sign(signed_string),
|
|
"algorithm": "rsa-sha256",
|
|
}
|
|
)
|
|
del headers["(request-target)"]
|
|
async with httpx.AsyncClient() as client:
|
|
print(f"Calling {method} {uri}")
|
|
print(body)
|
|
return await client.request(
|
|
method,
|
|
uri,
|
|
headers=headers,
|
|
content=body_bytes,
|
|
)
|
|
|
|
|
|
class SignatureDetails(TypedDict):
|
|
algorithm: str
|
|
headers: List[str]
|
|
signature: bytes
|
|
keyid: str
|