From f4a8a96b813404dc9fa9f81505aeb5b46cc3e074 Mon Sep 17 00:00:00 2001 From: Andrew Godwin Date: Sun, 8 Jan 2023 12:43:32 -0700 Subject: [PATCH] Add first-class deletion support to Stator --- activities/models/fan_out.py | 2 +- activities/models/post.py | 2 +- activities/models/post_interaction.py | 2 +- stator/graph.py | 2 ++ stator/models.py | 27 ++++++++++++++++++++++++--- stator/runner.py | 1 + users/models/follow.py | 2 +- users/models/identity.py | 2 +- users/models/inbox_message.py | 18 ++++-------------- 9 files changed, 36 insertions(+), 22 deletions(-) diff --git a/activities/models/fan_out.py b/activities/models/fan_out.py index f36b5bb..40d73dd 100644 --- a/activities/models/fan_out.py +++ b/activities/models/fan_out.py @@ -11,7 +11,7 @@ from users.models import FollowStates class FanOutStates(StateGraph): new = State(try_interval=600) sent = State() - failed = State() + failed = State(delete_after=86400) new.transitions_to(sent) new.times_out_to(failed, seconds=86400 * 3) diff --git a/activities/models/post.py b/activities/models/post.py index c199d77..ed28c83 100644 --- a/activities/models/post.py +++ b/activities/models/post.py @@ -42,7 +42,7 @@ class PostStates(StateGraph): new = State(try_interval=300) fanned_out = State(externally_progressed=True) deleted = State(try_interval=300) - deleted_fanned_out = State() + deleted_fanned_out = State(delete_after=24 * 60 * 60) edited = State(try_interval=300) edited_fanned_out = State(externally_progressed=True) diff --git a/activities/models/post_interaction.py b/activities/models/post_interaction.py index 2589dca..bef4ad6 100644 --- a/activities/models/post_interaction.py +++ b/activities/models/post_interaction.py @@ -12,7 +12,7 @@ class PostInteractionStates(StateGraph): new = State(try_interval=300) fanned_out = State(externally_progressed=True) undone = State(try_interval=300) - undone_fanned_out = State() + undone_fanned_out = State(delete_after=24 * 60 * 60) new.transitions_to(fanned_out) fanned_out.transitions_to(undone) diff --git a/stator/graph.py b/stator/graph.py index da818e9..7019e3f 100644 --- a/stator/graph.py +++ b/stator/graph.py @@ -90,12 +90,14 @@ class State: externally_progressed: bool = False, attempt_immediately: bool = True, force_initial: bool = False, + delete_after: int | None = None, ): self.try_interval = try_interval self.handler_name = handler_name self.externally_progressed = externally_progressed self.attempt_immediately = attempt_immediately self.force_initial = force_initial + self.delete_after = delete_after self.parents: set["State"] = set() self.children: set["State"] = set() self.timeout_state: State | None = None diff --git a/stator/models.py b/stator/models.py index 0987724..95b5f5b 100644 --- a/stator/models.py +++ b/stator/models.py @@ -123,6 +123,8 @@ class StatorModel(models.Model): """ Finds instances of this model that need to run and schedule them. """ + if now is None: + now = timezone.now() q = models.Q() for state in cls.state_graph.states.values(): state = cast(State, state) @@ -130,9 +132,11 @@ class StatorModel(models.Model): q = q | models.Q( ( models.Q( - state_attempted__lte=timezone.now() - - datetime.timedelta( - seconds=cast(float, state.try_interval) + state_attempted__lte=( + now + - datetime.timedelta( + seconds=cast(float, state.try_interval) + ) ) ) | models.Q(state_attempted__isnull=True) @@ -141,6 +145,23 @@ class StatorModel(models.Model): ) await cls.objects.filter(q).aupdate(state_ready=True) + @classmethod + async def atransition_delete_due(cls, now=None): + """ + Finds instances of this model that need to be deleted and deletes them. + """ + if now is None: + now = timezone.now() + for state in cls.state_graph.states.values(): + state = cast(State, state) + if state.delete_after: + await cls.objects.filter( + state=state, + state_changed__lte=( + now - datetime.timedelta(seconds=state.delete_after) + ), + ).adelete() + @classmethod def transition_get_with_lock( cls, number: int, lock_expiry: datetime.datetime diff --git a/stator/runner.py b/stator/runner.py index 7a39296..db88aff 100644 --- a/stator/runner.py +++ b/stator/runner.py @@ -116,6 +116,7 @@ class StatorRunner: asyncio.create_task(self.submit_stats(model)) asyncio.create_task(model.atransition_clean_locks()) asyncio.create_task(model.atransition_schedule_due()) + asyncio.create_task(model.atransition_delete_due()) self.last_clean = time.monotonic() async def submit_stats(self, model): diff --git a/users/models/follow.py b/users/models/follow.py index dc116b6..ac6948d 100644 --- a/users/models/follow.py +++ b/users/models/follow.py @@ -14,7 +14,7 @@ class FollowStates(StateGraph): remote_requested = State(try_interval=24 * 60 * 60) accepted = State(externally_progressed=True) undone = State(try_interval=60 * 60) - undone_remotely = State() + undone_remotely = State(delete_after=24 * 60 * 60) failed = State() rejected = State() diff --git a/users/models/identity.py b/users/models/identity.py index f62c3a5..856643c 100644 --- a/users/models/identity.py +++ b/users/models/identity.py @@ -48,7 +48,7 @@ class IdentityStates(StateGraph): edited = State(try_interval=300, attempt_immediately=True) deleted = State(try_interval=300, attempt_immediately=True) - deleted_fanned_out = State(externally_progressed=True) + deleted_fanned_out = State(delete_after=86400 * 7) deleted.transitions_to(deleted_fanned_out) diff --git a/users/models/inbox_message.py b/users/models/inbox_message.py index 00e5b74..baa81c8 100644 --- a/users/models/inbox_message.py +++ b/users/models/inbox_message.py @@ -5,15 +5,12 @@ from stator.models import State, StateField, StateGraph, StatorModel class InboxMessageStates(StateGraph): - received = State(try_interval=300) - processed = State(externally_progressed=True) - purge = State(try_interval=300) - purged = State() # Not actually real, nothing gets here + received = State(try_interval=300, delete_after=86400 * 3) + processed = State(externally_progressed=True, delete_after=86400) + purge = State(delete_after=24 * 60 * 60) # Delete after release (back compat) received.transitions_to(processed) - processed.times_out_to(purge, 86400 * 1) - received.times_out_to(purge, 86400 * 3) - purge.transitions_to(purged) + processed.transitions_to(purge) # Delete after release (back compat) @classmethod async def handle_received(cls, instance: "InboxMessage"): @@ -152,13 +149,6 @@ class InboxMessageStates(StateGraph): raise ValueError(f"Cannot handle activity of type {unknown}") return cls.processed - @classmethod - async def handle_purge(cls, instance: "InboxMessage"): - """ - Just delete them! - """ - await InboxMessage.objects.filter(pk=instance.pk).adelete() - class InboxMessage(StatorModel): """