diff --git a/activities/migrations/0001_initial.py b/activities/migrations/0001_initial.py index fa7ba2a..437a580 100644 --- a/activities/migrations/0001_initial.py +++ b/activities/migrations/0001_initial.py @@ -263,6 +263,7 @@ class Migration(migrations.Migration): ("undo_interaction", "Undo Interaction"), ("identity_edited", "Identity Edited"), ("identity_deleted", "Identity Deleted"), + ("identity_created", "Identity Created"), ], max_length=100, ), @@ -325,6 +326,7 @@ class Migration(migrations.Migration): ("followed", "Followed"), ("boosted", "Boosted"), ("announcement", "Announcement"), + ("identity_created", "Identity Created"), ], max_length=100, ), diff --git a/activities/migrations/0010_stator_indexes.py b/activities/migrations/0010_stator_indexes.py new file mode 100644 index 0000000..f5a2db2 --- /dev/null +++ b/activities/migrations/0010_stator_indexes.py @@ -0,0 +1,55 @@ +# Generated by Django 4.1.4 on 2023-02-04 01:05 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ("activities", "0009_alter_timelineevent_index_together"), + ] + + operations = [ + migrations.AlterField( + model_name="emoji", + name="state_ready", + field=models.BooleanField(db_index=True, default=True), + ), + migrations.AlterField( + model_name="fanout", + name="state_ready", + field=models.BooleanField(db_index=True, default=True), + ), + migrations.AlterField( + model_name="hashtag", + name="state_ready", + field=models.BooleanField(db_index=True, default=True), + ), + migrations.AlterField( + model_name="post", + name="state_ready", + field=models.BooleanField(db_index=True, default=True), + ), + migrations.AlterField( + model_name="postattachment", + name="state_ready", + field=models.BooleanField(db_index=True, default=True), + ), + migrations.AlterField( + model_name="postinteraction", + name="state_ready", + field=models.BooleanField(db_index=True, default=True), + ), + migrations.AlterIndexTogether( + name="fanout", + index_together={("state_ready", "state_locked_until", "state")}, + ), + migrations.AlterIndexTogether( + name="hashtag", + index_together={("state_ready", "state_locked_until", "state")}, + ), + migrations.AlterIndexTogether( + name="postattachment", + index_together={("state_ready", "state_locked_until", "state")}, + ), + ] diff --git a/stator/models.py b/stator/models.py index 95b5f5b..4d20574 100644 --- a/stator/models.py +++ b/stator/models.py @@ -84,7 +84,7 @@ class StatorModel(models.Model): state: StateField # If this row is up for transition attempts (which it always is on creation!) - state_ready = models.BooleanField(default=True) + state_ready = models.BooleanField(default=True, db_index=True) # When the state last actually changed, or the date of instance creation state_changed = models.DateTimeField(auto_now_add=True) @@ -102,6 +102,7 @@ class StatorModel(models.Model): class Meta: abstract = True + index_together = ["state_ready", "state_locked_until", "state"] # Need this empty indexes to ensure child Models have a Meta.indexes # that will look to add indexes (that we inject with class_prepared) indexes: list = [] diff --git a/stator/runner.py b/stator/runner.py index ad4f515..7162380 100644 --- a/stator/runner.py +++ b/stator/runner.py @@ -6,7 +6,7 @@ import time import traceback import uuid -from asgiref.sync import async_to_sync, sync_to_async +from asgiref.sync import ThreadSensitiveContext, async_to_sync, sync_to_async from django.conf import settings from django.utils import timezone @@ -15,6 +15,28 @@ from core.models import Config from stator.models import StatorModel, Stats +class LoopingTask: + """ + Wrapper for having a coroutine go in the background and only have one + copy running at a time. + """ + + def __init__(self, callable): + self.callable = callable + self.task: asyncio.Task | None = None + + def run(self) -> bool: + # If we have a task object, see if we can clear it up + if self.task is not None: + if self.task.done(): + self.task = None + else: + return False + # OK, launch a new task + self.task = asyncio.create_task(self.callable()) + return True + + class StatorRunner: """ Runs tasks on models that are looking for state changes. @@ -26,7 +48,7 @@ class StatorRunner: models: list[type[StatorModel]], concurrency: int = getattr(settings, "STATOR_CONCURRENCY", 50), concurrency_per_model: int = getattr( - settings, "STATOR_CONCURRENCY_PER_MODEL", 20 + settings, "STATOR_CONCURRENCY_PER_MODEL", 15 ), liveness_file: str | None = None, schedule_interval: int = 30, @@ -53,6 +75,9 @@ class StatorRunner: self.last_clean = time.monotonic() - self.schedule_interval self.tasks = [] self.loop_delay = self.minimum_loop_delay + self.schedule_task = LoopingTask(self.run_scheduling) + self.fetch_task = LoopingTask(self.fetch_and_process_tasks) + self.config_task = LoopingTask(self.load_config) # For the first time period, launch tasks print("Running main task loop") try: @@ -64,22 +89,25 @@ class StatorRunner: # previous one is cancelled) signal.alarm(self.schedule_interval * 2) # Refresh the config - Config.system = await Config.aload_system() - print("Tasks processed this loop:") - for label, number in self.handled.items(): - print(f" {label}: {number}") - print("Running cleaning and scheduling") - await self.run_scheduling() + self.config_task.run() + if self.schedule_task.run(): + print("Running cleaning and scheduling") + else: + print("Previous scheduling still running...!") # Write liveness file if configured if self.liveness_file: with open(self.liveness_file, "w") as fh: fh.write(str(int(time.time()))) + self.last_clean = time.monotonic() # Clear the cleaning breadcrumbs/extra for the main part of the loop sentry.scope_clear(scope) self.remove_completed_tasks() - await self.fetch_and_process_tasks() + + # Fetching is kind of blocking, so we need to do this + # as a separate coroutine + self.fetch_task.run() # Are we in limited run mode? if ( @@ -122,17 +150,28 @@ class StatorRunner: print("Watchdog timeout exceeded") os._exit(2) + async def load_config(self): + """ + Refreshes config from the DB + """ + Config.system = await Config.aload_system() + async def run_scheduling(self): """ Do any transition cleanup tasks """ + if self.handled: + print("Tasks processed since last flush:") + for label, number in self.handled.items(): + print(f" {label}: {number}") + else: + print("No tasks handled since last flush.") with sentry.start_transaction(op="task", name="stator.run_scheduling"): for model in self.models: - asyncio.create_task(self.submit_stats(model)) - asyncio.create_task(model.atransition_clean_locks()) - asyncio.create_task(model.atransition_schedule_due()) - asyncio.create_task(model.atransition_delete_due()) - self.last_clean = time.monotonic() + await self.submit_stats(model) + await model.atransition_clean_locks() + await model.atransition_schedule_due() + await model.atransition_delete_due() async def submit_stats(self, model): """ @@ -171,25 +210,26 @@ class StatorRunner: Wrapper for atransition_attempt with fallback error handling """ task_name = f"stator.run_transition:{instance._meta.label_lower}#{{id}} from {instance.state}" - with sentry.start_transaction(op="task", name=task_name): - sentry.set_context( - "instance", - { - "model": instance._meta.label_lower, - "pk": instance.pk, - "state": instance.state, - "state_age": instance.state_age, - }, - ) - - try: - print( - f"Attempting transition on {instance._meta.label_lower}#{instance.pk} from state {instance.state}" + async with ThreadSensitiveContext(): + with sentry.start_transaction(op="task", name=task_name): + sentry.set_context( + "instance", + { + "model": instance._meta.label_lower, + "pk": instance.pk, + "state": instance.state, + "state_age": instance.state_age, + }, ) - await instance.atransition_attempt() - except BaseException as e: - await exceptions.acapture_exception(e) - traceback.print_exc() + + try: + print( + f"Attempting transition on {instance._meta.label_lower}#{instance.pk} from state {instance.state}" + ) + await instance.atransition_attempt() + except BaseException as e: + await exceptions.acapture_exception(e) + traceback.print_exc() def remove_completed_tasks(self): """ diff --git a/takahe/settings.py b/takahe/settings.py index efd7861..d861c42 100644 --- a/takahe/settings.py +++ b/takahe/settings.py @@ -143,8 +143,8 @@ class Settings(BaseSettings): CACHES_DEFAULT: CacheBackendUrl | None = None # Stator tuning - STATOR_CONCURRENCY: int = 100 - STATOR_CONCURRENCY_PER_MODEL: int = 40 + STATOR_CONCURRENCY: int = 50 + STATOR_CONCURRENCY_PER_MODEL: int = 15 PGHOST: str | None = None PGPORT: int | None = 5432 diff --git a/users/migrations/0013_stator_indexes.py b/users/migrations/0013_stator_indexes.py new file mode 100644 index 0000000..0cc900b --- /dev/null +++ b/users/migrations/0013_stator_indexes.py @@ -0,0 +1,64 @@ +# Generated by Django 4.1.4 on 2023-02-04 01:05 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ("users", "0012_block_states"), + ] + + operations = [ + migrations.AlterField( + model_name="block", + name="state_ready", + field=models.BooleanField(db_index=True, default=True), + ), + migrations.AlterField( + model_name="domain", + name="state_ready", + field=models.BooleanField(db_index=True, default=True), + ), + migrations.AlterField( + model_name="follow", + name="state_ready", + field=models.BooleanField(db_index=True, default=True), + ), + migrations.AlterField( + model_name="identity", + name="state_ready", + field=models.BooleanField(db_index=True, default=True), + ), + migrations.AlterField( + model_name="inboxmessage", + name="state_ready", + field=models.BooleanField(db_index=True, default=True), + ), + migrations.AlterField( + model_name="passwordreset", + name="state_ready", + field=models.BooleanField(db_index=True, default=True), + ), + migrations.AlterField( + model_name="report", + name="state_ready", + field=models.BooleanField(db_index=True, default=True), + ), + migrations.AlterIndexTogether( + name="domain", + index_together={("state_ready", "state_locked_until", "state")}, + ), + migrations.AlterIndexTogether( + name="inboxmessage", + index_together={("state_ready", "state_locked_until", "state")}, + ), + migrations.AlterIndexTogether( + name="passwordreset", + index_together={("state_ready", "state_locked_until", "state")}, + ), + migrations.AlterIndexTogether( + name="report", + index_together={("state_ready", "state_locked_until", "state")}, + ), + ]