33
44import queue
55import threading
6+ import weakref
67from collections import defaultdict
78from inspect import signature
89from threading import Lock
@@ -67,10 +68,10 @@ def create_store(
6768 store_options = options or CreateStoreOptions ()
6869
6970 state : State | None = None
70- listeners : set [Callable [[State ], Any ]] = set ()
71+ listeners : set [Callable [[State ], Any ] | weakref . ref [ Callable [[ State ], Any ]] ] = set ()
7172 event_handlers : defaultdict [
7273 type [Event ],
73- set [tuple [EventHandler , EventSubscriptionOptions ]],
74+ set [tuple [EventHandler | weakref . ref [ EventHandler ] , EventSubscriptionOptions ]],
7475 ] = defaultdict (set )
7576
7677 actions : list [Action ] = []
@@ -97,15 +98,31 @@ def run() -> None:
9798 state = result
9899
99100 if isinstance (action , FinishAction ):
100- events . append (cast (Event , FinishEvent ()))
101+ dispatch (cast (Event , FinishEvent ()))
101102
102103 if len (actions ) == 0 and state :
103- for listener in listeners .copy ():
104+ for listener_ in listeners .copy ():
105+ if isinstance (listener_ , weakref .ref ):
106+ listener = listener_ ()
107+ if listener is None :
108+ listeners .remove (listener_ )
109+ continue
110+ else :
111+ listener = listener_
104112 listener (state )
105113
106114 if len (events ) > 0 :
107115 event = events .pop (0 )
108- for event_handler , options in event_handlers [type (event )].copy ():
116+ for event_handler_ , options in event_handlers [type (event )].copy ():
117+ if isinstance (event_handler_ , weakref .ref ):
118+ event_handler = event_handler_ ()
119+ if event_handler is None :
120+ event_handlers [type (event )].remove (
121+ (event_handler_ , options ),
122+ )
123+ continue
124+ else :
125+ event_handler = event_handler_
109126 if options .run_async :
110127 event_handlers_queue .put ((event_handler , event ))
111128 elif len (signature (event_handler ).parameters ) == 1 :
@@ -140,19 +157,33 @@ def dispatch(
140157 if store_options .scheduler is None and not is_running .locked ():
141158 run ()
142159
143- def subscribe (listener : Callable [[State ], Any ]) -> Callable [[], None ]:
144- listeners .add (listener )
145- return lambda : listeners .remove (listener )
160+ def subscribe (
161+ listener : Callable [[State ], Any ],
162+ * ,
163+ keep_ref : bool = True ,
164+ ) -> Callable [[], None ]:
165+ listener_ref = listener if keep_ref else weakref .ref (listener )
166+
167+ listeners .add (listener_ref )
168+ return lambda : listeners .remove (listener_ref )
146169
147170 def subscribe_event (
148171 event_type : type [Event2 ],
149172 handler : EventHandler [Event2 ],
173+ * ,
150174 options : EventSubscriptionOptions | None = None ,
151175 ) -> Callable [[], None ]:
152- _options = EventSubscriptionOptions () if options is None else options
153- event_handlers [cast (type [Event ], event_type )].add ((handler , _options ))
176+ subscription_options = (
177+ EventSubscriptionOptions () if options is None else options
178+ )
179+
180+ handler_ref = handler if subscription_options .keep_ref else weakref .ref (handler )
181+
182+ event_handlers [cast (type [Event ], event_type )].add (
183+ (handler_ref , subscription_options ),
184+ )
154185 return lambda : event_handlers [cast (type [Event ], event_type )].remove (
155- (handler , _options ),
186+ (handler_ref , subscription_options ),
156187 )
157188
158189 def handle_finish_event (_event : Event ) -> None :
@@ -181,7 +212,10 @@ def decorator(
181212 last_selector_result : SelectorOutput | None = None
182213 last_comparator_result : ComparatorOutput = cast (ComparatorOutput , object ())
183214 last_value : AutorunOriginalReturnType | None = autorun_options .default_value
184- subscriptions : list [Callable [[AutorunOriginalReturnType ], Any ]] = []
215+ subscriptions : set [
216+ Callable [[AutorunOriginalReturnType ], Any ]
217+ | weakref .ref [Callable [[AutorunOriginalReturnType ], Any ]]
218+ ] = set ()
185219
186220 def check_and_call (state : State ) -> None :
187221 nonlocal \
@@ -217,7 +251,14 @@ def check_and_call(state: State) -> None:
217251 selector_result ,
218252 previous_result ,
219253 )
220- for subscriber in subscriptions :
254+ for subscriber_ in subscriptions .copy ():
255+ if isinstance (subscriber_ , weakref .ref ):
256+ subscriber = subscriber_ ()
257+ if subscriber is None :
258+ subscriptions .remove (subscriber_ )
259+ continue
260+ else :
261+ subscriber = subscriber_
221262 subscriber (last_value )
222263
223264 if autorun_options .initial_run and state is not None :
@@ -241,14 +282,16 @@ def subscribe(
241282 * ,
242283 immediate_run : bool
243284 | None = autorun_options .subscribers_immediate_run ,
285+ keep_ref : bool | None = autorun_options .subscribers_keep_ref ,
244286 ) -> Callable [[], None ]:
245- subscriptions .append (callback )
287+ callback_ref = callback if keep_ref else weakref .ref (callback )
288+ subscriptions .add (callback_ref )
246289
247290 if immediate_run :
248291 callback (self .value )
249292
250293 def unsubscribe () -> None :
251- subscriptions .remove (callback )
294+ subscriptions .remove (callback_ref )
252295
253296 return unsubscribe
254297
0 commit comments