77import queue
88import weakref
99from collections import defaultdict
10- from threading import Lock
10+ from threading import Lock , Thread
1111from typing import Any , Callable , Generic , cast
1212
1313from redux .autorun import Autorun
@@ -50,7 +50,6 @@ def __init__(
5050 options : CreateStoreOptions [Action , Event ] | None = None ,
5151 ) -> None :
5252 """Create a new store."""
53- self .finished = False
5453 self .store_options = options or CreateStoreOptions ()
5554 self .reducer = reducer
5655 self ._create_task = self .store_options .task_creator
@@ -125,8 +124,8 @@ def _run_actions(self: Store[State, Action, Event]) -> None:
125124
126125 def _run_event_handlers (self : Store [State , Action , Event ]) -> None :
127126 event = self ._events .pop (0 )
128- for event_handler_ in self ._event_handlers [type (event )].copy ():
129- self ._event_handlers_queue .put_nowait ((event_handler_ , event ))
127+ for event_handler in self ._event_handlers [type (event )].copy ():
128+ self ._event_handlers_queue .put_nowait ((event_handler , event ))
130129
131130 def run (self : Store [State , Action , Event ]) -> None :
132131 """Run the store."""
@@ -137,13 +136,6 @@ def run(self: Store[State, Action, Event]) -> None:
137136
138137 if len (self ._events ) > 0 :
139138 self ._run_event_handlers ()
140- if (
141- self .finished
142- and self ._actions == []
143- and self ._events == []
144- and not any (worker .is_alive () for worker in self ._workers )
145- ):
146- self .clean_up ()
147139
148140 def clean_up (self : Store [State , Action , Event ]) -> None :
149141 """Clean up the store."""
@@ -152,8 +144,6 @@ def clean_up(self: Store[State, Action, Event]) -> None:
152144 self ._workers .clear ()
153145 self ._listeners .clear ()
154146 self ._event_handlers .clear ()
155- if self .store_options .on_finish :
156- self .store_options .on_finish ()
157147
158148 def dispatch (
159149 self : Store [State , Action , Event ],
@@ -225,10 +215,30 @@ def unsubscribe() -> None:
225215
226216 return unsubscribe
227217
218+ def wait_for_store_to_finish (self : Store [State , Action , Event ]) -> None :
219+ """Wait for the store to finish."""
220+ import time
221+
222+ while True :
223+ if (
224+ self ._actions == []
225+ and self ._events == []
226+ and self ._event_handlers_queue .qsize () == 0
227+ ):
228+ time .sleep (self .store_options .grace_time_in_seconds )
229+ self ._event_handlers_queue .join ()
230+ for _ in range (self .store_options .threads ):
231+ self ._event_handlers_queue .put_nowait (None )
232+ self ._event_handlers_queue .join ()
233+ self .clean_up ()
234+ if self .store_options .on_finish :
235+ self .store_options .on_finish ()
236+ break
237+ time .sleep (0.1 )
238+
228239 def _handle_finish_event (self : Store [State , Action , Event ]) -> None :
229- for _ in range (self .store_options .threads ):
230- self ._event_handlers_queue .put_nowait (None )
231- self .finished = True
240+ thread = Thread (target = self .wait_for_store_to_finish )
241+ thread .start ()
232242
233243 def autorun (
234244 self : Store [State , Action , Event ],
0 commit comments