mirror of
https://github.com/jointakahe/takahe.git
synced 2025-01-10 14:15:28 +00:00
Stator stats overhaul
Removes the error table, adds a stats table and admin page. Fixes #166
This commit is contained in:
parent
5e912ecac5
commit
1130c23b1e
9 changed files with 224 additions and 48 deletions
|
@ -1,20 +1,15 @@
|
|||
from django.contrib import admin
|
||||
|
||||
from stator.models import StatorError
|
||||
from stator.models import Stats
|
||||
|
||||
|
||||
@admin.register(StatorError)
|
||||
@admin.register(Stats)
|
||||
class DomainAdmin(admin.ModelAdmin):
|
||||
list_display = [
|
||||
"id",
|
||||
"date",
|
||||
"model_label",
|
||||
"instance_pk",
|
||||
"state",
|
||||
"error",
|
||||
"updated",
|
||||
]
|
||||
list_filter = ["model_label", "date"]
|
||||
ordering = ["-date"]
|
||||
ordering = ["model_label"]
|
||||
|
||||
def has_add_permission(self, request, obj=None):
|
||||
return False
|
||||
|
|
31
stator/migrations/0002_stats_delete_statorerror.py
Normal file
31
stator/migrations/0002_stats_delete_statorerror.py
Normal file
|
@ -0,0 +1,31 @@
|
|||
# Generated by Django 4.1.4 on 2022-12-15 18:38
|
||||
|
||||
from django.db import migrations, models
|
||||
|
||||
|
||||
class Migration(migrations.Migration):
|
||||
|
||||
dependencies = [
|
||||
("stator", "0001_initial"),
|
||||
]
|
||||
|
||||
operations = [
|
||||
migrations.CreateModel(
|
||||
name="Stats",
|
||||
fields=[
|
||||
(
|
||||
"model_label",
|
||||
models.CharField(max_length=200, primary_key=True, serialize=False),
|
||||
),
|
||||
("statistics", models.JSONField()),
|
||||
("created", models.DateTimeField(auto_now_add=True)),
|
||||
("updated", models.DateTimeField(auto_now=True)),
|
||||
],
|
||||
options={
|
||||
"verbose_name_plural": "Stats",
|
||||
},
|
||||
),
|
||||
migrations.DeleteModel(
|
||||
name="StatorError",
|
||||
),
|
||||
]
|
159
stator/models.py
159
stator/models.py
|
@ -1,5 +1,4 @@
|
|||
import datetime
|
||||
import pprint
|
||||
import traceback
|
||||
from typing import ClassVar, cast
|
||||
|
||||
|
@ -127,6 +126,19 @@ class StatorModel(models.Model):
|
|||
) -> list["StatorModel"]:
|
||||
return await sync_to_async(cls.transition_get_with_lock)(number, lock_expiry)
|
||||
|
||||
@classmethod
|
||||
async def atransition_ready_count(cls) -> int:
|
||||
"""
|
||||
Returns how many instances are "queued"
|
||||
"""
|
||||
return await (
|
||||
cls.objects.filter(
|
||||
state_locked_until__isnull=True,
|
||||
state_ready=True,
|
||||
state__in=cls.state_graph.automatic_states,
|
||||
).acount()
|
||||
)
|
||||
|
||||
@classmethod
|
||||
async def atransition_clean_locks(cls):
|
||||
await cls.objects.filter(state_locked_until__lte=timezone.now()).aupdate(
|
||||
|
@ -158,7 +170,6 @@ class StatorModel(models.Model):
|
|||
try:
|
||||
next_state = await current_state.handler(self)
|
||||
except BaseException as e:
|
||||
await StatorError.acreate_from_instance(self, e)
|
||||
await exceptions.acapture_exception(e)
|
||||
traceback.print_exc()
|
||||
else:
|
||||
|
@ -209,46 +220,126 @@ class StatorModel(models.Model):
|
|||
atransition_perform = sync_to_async(transition_perform)
|
||||
|
||||
|
||||
class StatorError(models.Model):
|
||||
class Stats(models.Model):
|
||||
"""
|
||||
Tracks any errors running the transitions.
|
||||
Meant to be cleaned out regularly. Should probably be a log.
|
||||
Tracks summary statistics of each model over time.
|
||||
"""
|
||||
|
||||
# appname.modelname (lowercased) label for the model this represents
|
||||
model_label = models.CharField(max_length=200)
|
||||
model_label = models.CharField(max_length=200, primary_key=True)
|
||||
|
||||
# The primary key of that model (probably int or str)
|
||||
instance_pk = models.CharField(max_length=200)
|
||||
statistics = models.JSONField()
|
||||
|
||||
# The state we were on
|
||||
state = models.CharField(max_length=200)
|
||||
created = models.DateTimeField(auto_now_add=True)
|
||||
updated = models.DateTimeField(auto_now=True)
|
||||
|
||||
# When it happened
|
||||
date = models.DateTimeField(auto_now_add=True)
|
||||
|
||||
# Error name
|
||||
error = models.TextField()
|
||||
|
||||
# Error details
|
||||
error_details = models.TextField(blank=True, null=True)
|
||||
class Meta:
|
||||
verbose_name_plural = "Stats"
|
||||
|
||||
@classmethod
|
||||
async def acreate_from_instance(
|
||||
cls,
|
||||
instance: StatorModel,
|
||||
exception: BaseException | None = None,
|
||||
):
|
||||
detail = traceback.format_exc()
|
||||
if exception and len(exception.args) > 1:
|
||||
detail += "\n\n" + "\n\n".join(
|
||||
pprint.pformat(arg) for arg in exception.args
|
||||
)
|
||||
def get_for_model(cls, model: type[StatorModel]) -> "Stats":
|
||||
instance = cls.objects.filter(model_label=model._meta.label_lower).first()
|
||||
if instance is None:
|
||||
instance = cls(model_label=model._meta.label_lower)
|
||||
if not instance.statistics:
|
||||
instance.statistics = {}
|
||||
# Ensure there are the right keys
|
||||
for key in ["queued", "hourly", "daily", "monthly"]:
|
||||
if key not in instance.statistics:
|
||||
instance.statistics[key] = {}
|
||||
return instance
|
||||
|
||||
return await cls.objects.acreate(
|
||||
model_label=instance._meta.label_lower,
|
||||
instance_pk=str(instance.pk),
|
||||
state=instance.state,
|
||||
error=str(exception),
|
||||
error_details=detail,
|
||||
@classmethod
|
||||
async def aget_for_model(cls, model: type[StatorModel]) -> "Stats":
|
||||
return await sync_to_async(cls.get_for_model)(model)
|
||||
|
||||
def set_queued(self, number: int):
|
||||
"""
|
||||
Sets the current queued amount.
|
||||
|
||||
The queue is an instantaneous value (a "gauge") rather than a
|
||||
sum ("counter"). It's mostly used for reporting what things are right
|
||||
now, but basic trend analysis is also used to see if we think the
|
||||
queue is backing up.
|
||||
"""
|
||||
self.statistics["queued"][
|
||||
int(timezone.now().replace(second=0, microsecond=0).timestamp())
|
||||
] = number
|
||||
|
||||
def add_handled(self, number: int):
|
||||
"""
|
||||
Adds the "handled" number to the current stats.
|
||||
"""
|
||||
hour = timezone.now().replace(minute=0, second=0, microsecond=0)
|
||||
day = hour.replace(hour=0)
|
||||
hour_timestamp = str(int(hour.timestamp()))
|
||||
day_timestamp = str(int(day.timestamp()))
|
||||
month_timestamp = str(int(day.replace(day=1).timestamp()))
|
||||
self.statistics["hourly"][hour_timestamp] = (
|
||||
self.statistics["hourly"].get(hour_timestamp, 0) + number
|
||||
)
|
||||
self.statistics["daily"][day_timestamp] = (
|
||||
self.statistics["daily"].get(day_timestamp, 0) + number
|
||||
)
|
||||
self.statistics["monthly"][month_timestamp] = (
|
||||
self.statistics["monthly"].get(month_timestamp, 0) + number
|
||||
)
|
||||
|
||||
def trim_data(self):
|
||||
"""
|
||||
Removes excessively old data from the field
|
||||
"""
|
||||
queued_horizon = int((timezone.now() - datetime.timedelta(hours=2)).timestamp())
|
||||
hourly_horizon = int(
|
||||
(timezone.now() - datetime.timedelta(hours=50)).timestamp()
|
||||
)
|
||||
daily_horizon = int((timezone.now() - datetime.timedelta(days=62)).timestamp())
|
||||
monthly_horizon = int(
|
||||
(timezone.now() - datetime.timedelta(days=3653)).timestamp()
|
||||
)
|
||||
self.statistics["queued"] = {
|
||||
ts: v
|
||||
for ts, v in self.statistics["queued"].items()
|
||||
if int(ts) >= queued_horizon
|
||||
}
|
||||
self.statistics["hourly"] = {
|
||||
ts: v
|
||||
for ts, v in self.statistics["hourly"].items()
|
||||
if int(ts) >= hourly_horizon
|
||||
}
|
||||
self.statistics["daily"] = {
|
||||
ts: v
|
||||
for ts, v in self.statistics["daily"].items()
|
||||
if int(ts) >= daily_horizon
|
||||
}
|
||||
self.statistics["monthly"] = {
|
||||
ts: v
|
||||
for ts, v in self.statistics["monthly"].items()
|
||||
if int(ts) >= monthly_horizon
|
||||
}
|
||||
|
||||
def most_recent_queued(self) -> int:
|
||||
"""
|
||||
Returns the most recent number of how many were queued
|
||||
"""
|
||||
queued = [(int(ts), v) for ts, v in self.statistics["queued"].items()]
|
||||
queued.sort(reverse=True)
|
||||
if queued:
|
||||
return queued[0][1]
|
||||
else:
|
||||
return 0
|
||||
|
||||
def most_recent_handled(self) -> tuple[int, int, int]:
|
||||
"""
|
||||
Returns the current handling numbers for hour, day, month
|
||||
"""
|
||||
hour = timezone.now().replace(minute=0, second=0, microsecond=0)
|
||||
day = hour.replace(hour=0)
|
||||
hour_timestamp = str(int(hour.timestamp()))
|
||||
day_timestamp = str(int(day.timestamp()))
|
||||
month_timestamp = str(int(day.replace(day=1).timestamp()))
|
||||
return (
|
||||
self.statistics["hourly"].get(hour_timestamp, 0),
|
||||
self.statistics["daily"].get(day_timestamp, 0),
|
||||
self.statistics["monthly"].get(month_timestamp, 0),
|
||||
)
|
||||
|
|
|
@ -4,12 +4,12 @@ import time
|
|||
import traceback
|
||||
import uuid
|
||||
|
||||
from asgiref.sync import async_to_sync
|
||||
from asgiref.sync import async_to_sync, sync_to_async
|
||||
from django.utils import timezone
|
||||
|
||||
from core import exceptions, sentry
|
||||
from core.models import Config
|
||||
from stator.models import StatorModel
|
||||
from stator.models import StatorModel, Stats
|
||||
|
||||
|
||||
class StatorRunner:
|
||||
|
@ -39,7 +39,7 @@ class StatorRunner:
|
|||
|
||||
async def run(self):
|
||||
sentry.set_takahe_app("stator")
|
||||
self.handled = 0
|
||||
self.handled = {}
|
||||
self.started = time.monotonic()
|
||||
self.last_clean = time.monotonic() - self.schedule_interval
|
||||
self.tasks = []
|
||||
|
@ -52,7 +52,9 @@ class StatorRunner:
|
|||
if (time.monotonic() - self.last_clean) >= self.schedule_interval:
|
||||
# Refresh the config
|
||||
Config.system = await Config.aload_system()
|
||||
print(f"{self.handled} tasks processed so far")
|
||||
print("Tasks processed this loop:")
|
||||
for label, number in self.handled.items():
|
||||
print(f" {label}: {number}")
|
||||
print("Running cleaning and scheduling")
|
||||
await self.run_scheduling()
|
||||
|
||||
|
@ -91,10 +93,23 @@ class StatorRunner:
|
|||
"""
|
||||
with sentry.start_transaction(op="task", name="stator.run_scheduling"):
|
||||
for model in self.models:
|
||||
asyncio.create_task(self.submit_stats(model))
|
||||
asyncio.create_task(model.atransition_clean_locks())
|
||||
asyncio.create_task(model.atransition_schedule_due())
|
||||
self.last_clean = time.monotonic()
|
||||
|
||||
async def submit_stats(self, model):
|
||||
"""
|
||||
Pop some statistics into the database
|
||||
"""
|
||||
stats_instance = await Stats.aget_for_model(model)
|
||||
if stats_instance.model_label in self.handled:
|
||||
stats_instance.add_handled(self.handled[stats_instance.model_label])
|
||||
del self.handled[stats_instance.model_label]
|
||||
stats_instance.set_queued(await model.atransition_ready_count())
|
||||
stats_instance.trim_data()
|
||||
await sync_to_async(stats_instance.save)()
|
||||
|
||||
async def fetch_and_process_tasks(self):
|
||||
# Calculate space left for tasks
|
||||
space_remaining = self.concurrency - len(self.tasks)
|
||||
|
@ -110,7 +125,9 @@ class StatorRunner:
|
|||
self.tasks.append(
|
||||
asyncio.create_task(self.run_transition(instance))
|
||||
)
|
||||
self.handled += 1
|
||||
self.handled[model._meta.label_lower] = (
|
||||
self.handled.get(model._meta.label_lower, 0) + 1
|
||||
)
|
||||
space_remaining -= 1
|
||||
|
||||
async def run_transition(self, instance: StatorModel):
|
||||
|
|
|
@ -127,6 +127,11 @@ urlpatterns = [
|
|||
"admin/hashtags/<hashtag>/delete/",
|
||||
admin.HashtagDelete.as_view(),
|
||||
),
|
||||
path(
|
||||
"admin/stator/",
|
||||
admin.Stator.as_view(),
|
||||
name="admin_stator",
|
||||
),
|
||||
# Identity views
|
||||
path("@<handle>/", identity.ViewIdentity.as_view()),
|
||||
path("@<handle>/inbox/", activitypub.Inbox.as_view()),
|
||||
|
|
13
templates/admin/stator.html
Normal file
13
templates/admin/stator.html
Normal file
|
@ -0,0 +1,13 @@
|
|||
{% extends "settings/base.html" %}
|
||||
|
||||
{% block subtitle %}Stator{% endblock %}
|
||||
|
||||
{% block content %}
|
||||
{% for model, stats in model_stats.items %}
|
||||
<fieldset>
|
||||
<legend>{{ model }}</legend>
|
||||
<p><b>Pending:</b> {{ stats.most_recent_queued }}</p>
|
||||
<p><b>Processed today:</b> {{ stats.most_recent_handled.1 }}</p>
|
||||
</fieldset>
|
||||
{% endfor %}
|
||||
{% endblock %}
|
|
@ -42,6 +42,9 @@
|
|||
<a href="{% url "admin_tuning" %}" {% if section == "tuning" %}class="selected"{% endif %} title="Tuning">
|
||||
<i class="fa-solid fa-wrench"></i> Tuning
|
||||
</a>
|
||||
<a href="{% url "admin_stator" %}" {% if section == "stator" %}class="selected"{% endif %} title="Stator">
|
||||
<i class="fa-solid fa-clock-rotate-left"></i> Stator
|
||||
</a>
|
||||
<a href="/djadmin" title="">
|
||||
<i class="fa-solid fa-gear"></i> Django Admin
|
||||
</a>
|
||||
|
|
|
@ -22,6 +22,7 @@ from users.views.admin.settings import ( # noqa
|
|||
PoliciesSettings,
|
||||
TuningSettings,
|
||||
)
|
||||
from users.views.admin.stator import Stator # noqa
|
||||
|
||||
|
||||
@method_decorator(admin_required, name="dispatch")
|
||||
|
|
20
users/views/admin/stator.py
Normal file
20
users/views/admin/stator.py
Normal file
|
@ -0,0 +1,20 @@
|
|||
from django.utils.decorators import method_decorator
|
||||
from django.views.generic import TemplateView
|
||||
|
||||
from stator.models import StatorModel, Stats
|
||||
from users.decorators import admin_required
|
||||
|
||||
|
||||
@method_decorator(admin_required, name="dispatch")
|
||||
class Stator(TemplateView):
|
||||
|
||||
template_name = "admin/stator.html"
|
||||
|
||||
def get_context_data(self):
|
||||
return {
|
||||
"model_stats": {
|
||||
model._meta.verbose_name_plural.title(): Stats.get_for_model(model)
|
||||
for model in StatorModel.subclasses
|
||||
},
|
||||
"section": "stator",
|
||||
}
|
Loading…
Reference in a new issue