Add first-class deletion support to Stator

This commit is contained in:
Andrew Godwin 2023-01-08 12:43:32 -07:00
parent b5b4a8ac5d
commit f4a8a96b81
9 changed files with 36 additions and 22 deletions

View file

@ -11,7 +11,7 @@ from users.models import FollowStates
class FanOutStates(StateGraph): class FanOutStates(StateGraph):
new = State(try_interval=600) new = State(try_interval=600)
sent = State() sent = State()
failed = State() failed = State(delete_after=86400)
new.transitions_to(sent) new.transitions_to(sent)
new.times_out_to(failed, seconds=86400 * 3) new.times_out_to(failed, seconds=86400 * 3)

View file

@ -42,7 +42,7 @@ class PostStates(StateGraph):
new = State(try_interval=300) new = State(try_interval=300)
fanned_out = State(externally_progressed=True) fanned_out = State(externally_progressed=True)
deleted = State(try_interval=300) deleted = State(try_interval=300)
deleted_fanned_out = State() deleted_fanned_out = State(delete_after=24 * 60 * 60)
edited = State(try_interval=300) edited = State(try_interval=300)
edited_fanned_out = State(externally_progressed=True) edited_fanned_out = State(externally_progressed=True)

View file

@ -12,7 +12,7 @@ class PostInteractionStates(StateGraph):
new = State(try_interval=300) new = State(try_interval=300)
fanned_out = State(externally_progressed=True) fanned_out = State(externally_progressed=True)
undone = State(try_interval=300) undone = State(try_interval=300)
undone_fanned_out = State() undone_fanned_out = State(delete_after=24 * 60 * 60)
new.transitions_to(fanned_out) new.transitions_to(fanned_out)
fanned_out.transitions_to(undone) fanned_out.transitions_to(undone)

View file

@ -90,12 +90,14 @@ class State:
externally_progressed: bool = False, externally_progressed: bool = False,
attempt_immediately: bool = True, attempt_immediately: bool = True,
force_initial: bool = False, force_initial: bool = False,
delete_after: int | None = None,
): ):
self.try_interval = try_interval self.try_interval = try_interval
self.handler_name = handler_name self.handler_name = handler_name
self.externally_progressed = externally_progressed self.externally_progressed = externally_progressed
self.attempt_immediately = attempt_immediately self.attempt_immediately = attempt_immediately
self.force_initial = force_initial self.force_initial = force_initial
self.delete_after = delete_after
self.parents: set["State"] = set() self.parents: set["State"] = set()
self.children: set["State"] = set() self.children: set["State"] = set()
self.timeout_state: State | None = None self.timeout_state: State | None = None

View file

@ -123,6 +123,8 @@ class StatorModel(models.Model):
""" """
Finds instances of this model that need to run and schedule them. Finds instances of this model that need to run and schedule them.
""" """
if now is None:
now = timezone.now()
q = models.Q() q = models.Q()
for state in cls.state_graph.states.values(): for state in cls.state_graph.states.values():
state = cast(State, state) state = cast(State, state)
@ -130,17 +132,36 @@ class StatorModel(models.Model):
q = q | models.Q( q = q | models.Q(
( (
models.Q( models.Q(
state_attempted__lte=timezone.now() state_attempted__lte=(
now
- datetime.timedelta( - datetime.timedelta(
seconds=cast(float, state.try_interval) seconds=cast(float, state.try_interval)
) )
) )
)
| models.Q(state_attempted__isnull=True) | models.Q(state_attempted__isnull=True)
), ),
state=state.name, state=state.name,
) )
await cls.objects.filter(q).aupdate(state_ready=True) await cls.objects.filter(q).aupdate(state_ready=True)
@classmethod
async def atransition_delete_due(cls, now=None):
"""
Finds instances of this model that need to be deleted and deletes them.
"""
if now is None:
now = timezone.now()
for state in cls.state_graph.states.values():
state = cast(State, state)
if state.delete_after:
await cls.objects.filter(
state=state,
state_changed__lte=(
now - datetime.timedelta(seconds=state.delete_after)
),
).adelete()
@classmethod @classmethod
def transition_get_with_lock( def transition_get_with_lock(
cls, number: int, lock_expiry: datetime.datetime cls, number: int, lock_expiry: datetime.datetime

View file

@ -116,6 +116,7 @@ class StatorRunner:
asyncio.create_task(self.submit_stats(model)) asyncio.create_task(self.submit_stats(model))
asyncio.create_task(model.atransition_clean_locks()) asyncio.create_task(model.atransition_clean_locks())
asyncio.create_task(model.atransition_schedule_due()) asyncio.create_task(model.atransition_schedule_due())
asyncio.create_task(model.atransition_delete_due())
self.last_clean = time.monotonic() self.last_clean = time.monotonic()
async def submit_stats(self, model): async def submit_stats(self, model):

View file

@ -14,7 +14,7 @@ class FollowStates(StateGraph):
remote_requested = State(try_interval=24 * 60 * 60) remote_requested = State(try_interval=24 * 60 * 60)
accepted = State(externally_progressed=True) accepted = State(externally_progressed=True)
undone = State(try_interval=60 * 60) undone = State(try_interval=60 * 60)
undone_remotely = State() undone_remotely = State(delete_after=24 * 60 * 60)
failed = State() failed = State()
rejected = State() rejected = State()

View file

@ -48,7 +48,7 @@ class IdentityStates(StateGraph):
edited = State(try_interval=300, attempt_immediately=True) edited = State(try_interval=300, attempt_immediately=True)
deleted = State(try_interval=300, attempt_immediately=True) deleted = State(try_interval=300, attempt_immediately=True)
deleted_fanned_out = State(externally_progressed=True) deleted_fanned_out = State(delete_after=86400 * 7)
deleted.transitions_to(deleted_fanned_out) deleted.transitions_to(deleted_fanned_out)

View file

@ -5,15 +5,12 @@ from stator.models import State, StateField, StateGraph, StatorModel
class InboxMessageStates(StateGraph): class InboxMessageStates(StateGraph):
received = State(try_interval=300) received = State(try_interval=300, delete_after=86400 * 3)
processed = State(externally_progressed=True) processed = State(externally_progressed=True, delete_after=86400)
purge = State(try_interval=300) purge = State(delete_after=24 * 60 * 60) # Delete after release (back compat)
purged = State() # Not actually real, nothing gets here
received.transitions_to(processed) received.transitions_to(processed)
processed.times_out_to(purge, 86400 * 1) processed.transitions_to(purge) # Delete after release (back compat)
received.times_out_to(purge, 86400 * 3)
purge.transitions_to(purged)
@classmethod @classmethod
async def handle_received(cls, instance: "InboxMessage"): async def handle_received(cls, instance: "InboxMessage"):
@ -152,13 +149,6 @@ class InboxMessageStates(StateGraph):
raise ValueError(f"Cannot handle activity of type {unknown}") raise ValueError(f"Cannot handle activity of type {unknown}")
return cls.processed return cls.processed
@classmethod
async def handle_purge(cls, instance: "InboxMessage"):
"""
Just delete them!
"""
await InboxMessage.objects.filter(pk=instance.pk).adelete()
class InboxMessage(StatorModel): class InboxMessage(StatorModel):
""" """