mirror of
https://github.com/jointakahe/takahe.git
synced 2024-11-22 15:21:01 +00:00
71 lines
2.5 KiB
Python
71 lines
2.5 KiB
Python
import asyncio
|
|
import time
|
|
import traceback
|
|
import uuid
|
|
|
|
from asgiref.sync import sync_to_async
|
|
from django.http import HttpResponse
|
|
from django.views import View
|
|
|
|
from miniq.models import Task
|
|
from users.models import Identity
|
|
|
|
|
|
class QueueProcessor(View):
|
|
"""
|
|
A view that takes some items off the queue and processes them.
|
|
Tries to limit its own runtime so it's within HTTP timeout limits.
|
|
"""
|
|
|
|
START_TIMEOUT = 30
|
|
TOTAL_TIMEOUT = 60
|
|
MAX_TASKS = 10
|
|
|
|
async def get(self, request):
|
|
start_time = time.monotonic()
|
|
processor_id = uuid.uuid4().hex
|
|
handled = 0
|
|
self.tasks = []
|
|
# For the first time period, launch tasks
|
|
while (time.monotonic() - start_time) < self.START_TIMEOUT:
|
|
# Remove completed tasks
|
|
self.tasks = [t for t in self.tasks if not t.done()]
|
|
# See if there's a new task
|
|
if len(self.tasks) < self.MAX_TASKS:
|
|
# Pop a task off the queue and run it
|
|
task = await sync_to_async(Task.get_one_available)(processor_id)
|
|
if task is not None:
|
|
self.tasks.append(asyncio.create_task(self.run_task(task)))
|
|
handled += 1
|
|
# Prevent busylooping
|
|
await asyncio.sleep(0.01)
|
|
# Then wait for tasks to finish
|
|
while (time.monotonic() - start_time) < self.TOTAL_TIMEOUT:
|
|
# Remove completed tasks
|
|
self.tasks = [t for t in self.tasks if not t.done()]
|
|
if not self.tasks:
|
|
break
|
|
# Prevent busylooping
|
|
await asyncio.sleep(1)
|
|
return HttpResponse(f"{handled} tasks handled")
|
|
|
|
async def run_task(self, task):
|
|
try:
|
|
print(f"Task {task}: Starting")
|
|
handler = getattr(self, f"handle_{task.type}", None)
|
|
if handler is None:
|
|
raise ValueError(f"Cannot handle type {task.type}")
|
|
await handler(task.subject, task.payload)
|
|
await task.complete()
|
|
print(f"Task {task}: Complete")
|
|
except BaseException as e:
|
|
print(f"Task {task}: Error {e}")
|
|
traceback.print_exc()
|
|
await task.fail(f"{e}\n\n" + traceback.format_exc())
|
|
|
|
async def handle_identity_fetch(self, subject, payload):
|
|
# Get the actor URI via webfinger
|
|
actor_uri, handle = await Identity.fetch_webfinger(subject)
|
|
# Get or create the identity, then fetch
|
|
identity = await sync_to_async(Identity.by_actor_uri)(actor_uri, create=True)
|
|
await identity.fetch_actor()
|