mirror of
https://github.com/jointakahe/takahe.git
synced 2024-11-14 11:21:13 +00:00
Improve stator's performance in larger installs
This commit is contained in:
parent
9bc18a1190
commit
b2768e7f2e
2 changed files with 15 additions and 6 deletions
|
@ -81,6 +81,8 @@ class StatorModel(models.Model):
|
||||||
concrete model yourself.
|
concrete model yourself.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
SCHEDULE_BATCH_SIZE = 1000
|
||||||
|
|
||||||
state: StateField
|
state: StateField
|
||||||
|
|
||||||
# If this row is up for transition attempts (which it always is on creation!)
|
# If this row is up for transition attempts (which it always is on creation!)
|
||||||
|
@ -141,7 +143,8 @@ class StatorModel(models.Model):
|
||||||
),
|
),
|
||||||
state=state.name,
|
state=state.name,
|
||||||
)
|
)
|
||||||
await cls.objects.filter(q).aupdate(state_ready=True)
|
select_query = cls.objects.filter(q)[: cls.SCHEDULE_BATCH_SIZE]
|
||||||
|
await cls.objects.filter(pk__in=select_query).aupdate(state_ready=True)
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
async def atransition_delete_due(cls, now=None):
|
async def atransition_delete_due(cls, now=None):
|
||||||
|
@ -153,12 +156,13 @@ class StatorModel(models.Model):
|
||||||
for state in cls.state_graph.states.values():
|
for state in cls.state_graph.states.values():
|
||||||
state = cast(State, state)
|
state = cast(State, state)
|
||||||
if state.delete_after:
|
if state.delete_after:
|
||||||
await cls.objects.filter(
|
select_query = cls.objects.filter(
|
||||||
state=state,
|
state=state,
|
||||||
state_changed__lte=(
|
state_changed__lte=(
|
||||||
now - datetime.timedelta(seconds=state.delete_after)
|
now - datetime.timedelta(seconds=state.delete_after)
|
||||||
),
|
),
|
||||||
).adelete()
|
)[: cls.SCHEDULE_BATCH_SIZE]
|
||||||
|
await cls.objects.filter(pk__in=select_query).adelete()
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def transition_get_with_lock(
|
def transition_get_with_lock(
|
||||||
|
@ -199,9 +203,10 @@ class StatorModel(models.Model):
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
async def atransition_clean_locks(cls):
|
async def atransition_clean_locks(cls):
|
||||||
await cls.objects.filter(state_locked_until__lte=timezone.now()).aupdate(
|
select_query = cls.objects.filter(state_locked_until__lte=timezone.now())[
|
||||||
state_locked_until=None
|
: cls.SCHEDULE_BATCH_SIZE
|
||||||
)
|
]
|
||||||
|
await cls.objects.filter(pk__in=select_query).aupdate(state_locked_until=None)
|
||||||
|
|
||||||
def transition_schedule(self):
|
def transition_schedule(self):
|
||||||
"""
|
"""
|
||||||
|
|
|
@ -169,9 +169,13 @@ class StatorRunner:
|
||||||
print("No tasks handled since last flush.")
|
print("No tasks handled since last flush.")
|
||||||
with sentry.start_transaction(op="task", name="stator.run_scheduling"):
|
with sentry.start_transaction(op="task", name="stator.run_scheduling"):
|
||||||
for model in self.models:
|
for model in self.models:
|
||||||
|
print(f"Scheduling {model._meta.label_lower}")
|
||||||
await self.submit_stats(model)
|
await self.submit_stats(model)
|
||||||
|
print(" Cleaning locks")
|
||||||
await model.atransition_clean_locks()
|
await model.atransition_clean_locks()
|
||||||
|
print(" Scheduling due items")
|
||||||
await model.atransition_schedule_due()
|
await model.atransition_schedule_due()
|
||||||
|
print(" Deleting due items")
|
||||||
await model.atransition_delete_due()
|
await model.atransition_delete_due()
|
||||||
|
|
||||||
async def submit_stats(self, model):
|
async def submit_stats(self, model):
|
||||||
|
|
Loading…
Reference in a new issue