diff --git a/activities/models/fan_out.py b/activities/models/fan_out.py index 2f73ba4..a0de7b8 100644 --- a/activities/models/fan_out.py +++ b/activities/models/fan_out.py @@ -8,10 +8,12 @@ from stator.models import State, StateField, StateGraph, StatorModel class FanOutStates(StateGraph): - new = State(try_interval=300) + new = State(try_interval=600) sent = State() + failed = State() new.transitions_to(sent) + new.times_out_to(failed, seconds=86400 * 3) @classmethod async def handle_new(cls, instance: "FanOut"): diff --git a/stator/graph.py b/stator/graph.py index 0ec5ee7..da818e9 100644 --- a/stator/graph.py +++ b/stator/graph.py @@ -98,6 +98,8 @@ class State: self.force_initial = force_initial self.parents: set["State"] = set() self.children: set["State"] = set() + self.timeout_state: State | None = None + self.timeout_value: int | None = None def _add_to_graph(self, graph: type[StateGraph], name: str): self.graph = graph @@ -124,6 +126,14 @@ class State: self.children.add(other) other.parents.add(other) + def times_out_to(self, other: "State", seconds: int): + if self.timeout_state is not None: + raise ValueError("Timeout state already set!") + self.timeout_state = other + self.timeout_value = seconds + self.children.add(other) + other.parents.add(other) + @property def initial(self): return self.force_initial or (not self.parents) diff --git a/stator/models.py b/stator/models.py index c6e777a..04bbd79 100644 --- a/stator/models.py +++ b/stator/models.py @@ -159,7 +159,7 @@ class StatorModel(models.Model): """ Attempts to transition the current state by running its handler(s). """ - current_state = self.state_graph.states[self.state] + current_state: State = self.state_graph.states[self.state] # If it's a manual progression state don't even try # We shouldn't really be here in this case, but it could be a race condition if current_state.externally_progressed: @@ -168,7 +168,7 @@ class StatorModel(models.Model): ) return None try: - next_state = await current_state.handler(self) + next_state = await current_state.handler(self) # type: ignore except BaseException as e: await exceptions.acapture_exception(e) traceback.print_exc() @@ -184,6 +184,14 @@ class StatorModel(models.Model): ) await self.atransition_perform(next_state) return next_state + # See if it timed out + if ( + current_state.timeout_value + and current_state.timeout_value + <= (timezone.now() - self.state_changed).total_seconds() + ): + await self.atransition_perform(current_state.timeout_state) + return current_state.timeout_state await self.__class__.objects.filter(pk=self.pk).aupdate( state_attempted=timezone.now(), state_locked_until=None,