|
| 1 | +""" |
| 2 | +Nested Traffic light machine |
| 3 | +---------------------------- |
| 4 | +
|
| 5 | +Demonstrates the concept of nested compound states. |
| 6 | +
|
| 7 | +From this example on XState: https://xstate.js.org/docs/guides/hierarchical.html#api |
| 8 | +
|
| 9 | +""" |
| 10 | +import time |
| 11 | + |
| 12 | +from statemachine import State |
| 13 | +from statemachine import StateMachine |
| 14 | + |
| 15 | + |
| 16 | +class NestedTrafficLightMachine(StateMachine): |
| 17 | + "A traffic light machine" |
| 18 | + green = State(initial=True, enter="reset_elapsed") |
| 19 | + yellow = State(enter="reset_elapsed") |
| 20 | + |
| 21 | + class red(State.Builder, enter="reset_elapsed"): |
| 22 | + "Pedestrian states" |
| 23 | + walk = State(initial=True) |
| 24 | + wait = State() |
| 25 | + stop = State() |
| 26 | + blinking = State() |
| 27 | + |
| 28 | + ped_countdown = walk.to(wait) | wait.to(stop) |
| 29 | + |
| 30 | + assert isinstance(red, State) |
| 31 | + timer = green.to(yellow) | yellow.to(red) | red.to(green) |
| 32 | + power_outage = red.blinking.from_() |
| 33 | + power_restored = red.from_() |
| 34 | + |
| 35 | + def __init__(self, seconds_to_turn_state=5, seconds_running=20): |
| 36 | + self.seconds_to_turn_state = seconds_to_turn_state |
| 37 | + self.seconds_running = seconds_running |
| 38 | + super().__init__(queued=True) |
| 39 | + |
| 40 | + def on_timer(self, event: str, source: State, target: State): |
| 41 | + print(f".. Running {event} from {source.id} to {target.id}") |
| 42 | + |
| 43 | + def reset_elapsed(self, event: str, time): |
| 44 | + print(f"entering reset_elapsed from {event} with {time}") |
| 45 | + self.last_turn = time if time else 0 |
| 46 | + |
| 47 | + @timer.cond |
| 48 | + def time_is_over(self, time): |
| 49 | + return time - self.last_turn > self.seconds_to_turn_state |
| 50 | + |
| 51 | + def run_forever(self): |
| 52 | + self.running = True |
| 53 | + start_time = time.time() |
| 54 | + while self.running: |
| 55 | + print("tick!") |
| 56 | + time.sleep(1) |
| 57 | + curr_time = time.time() |
| 58 | + self.send("timer", time=curr_time) |
| 59 | + |
| 60 | + if curr_time - start_time > self.seconds_running: |
| 61 | + self.running = False |
| 62 | + |
| 63 | + |
| 64 | +sm = NestedTrafficLightMachine() |
0 commit comments