File tree Expand file tree Collapse file tree 1 file changed +10
-9
lines changed Expand file tree Collapse file tree 1 file changed +10
-9
lines changed Original file line number Diff line number Diff line change @@ -183,22 +183,22 @@ def _cleanup(self, nonblock=False):
183183 # so the `get_current_session()` call in those thread will raise SessionNotFoundException
184184 del cls .thread2session [id (t )]
185185
186- if self .callback_mq is not None : # 回调功能已经激活, 结束回调线程
187- mq = queue .Queue (maxsize = 1 )
188- mq .put (None )
189- self .callback_mq = mq
190-
191- for mq in self .task_mqs .values ():
192- for _ in range (2 ):
186+ def try_best_to_add_item_to_mq (mq , item , try_count = 10 ):
187+ for _ in range (try_count ):
193188 try :
194- mq .put (None , block = not nonblock ) # 消费端接收到None消息会抛出SessionClosedException异常
195- break
189+ mq .put (item , block = False )
190+ return True
196191 except queue .Full :
197192 try :
198193 mq .get (block = False )
199194 except queue .Empty :
200195 pass
201196
197+ if self .callback_mq is not None : # 回调功能已经激活, 结束回调线程
198+ try_best_to_add_item_to_mq (self .callback_mq , None )
199+
200+ for mq in self .task_mqs .values ():
201+ try_best_to_add_item_to_mq (mq , None ) # 消费端接收到None消息会抛出SessionClosedException异常
202202 self .task_mqs = {}
203203
204204 def close (self , nonblock = False ):
@@ -252,6 +252,7 @@ def run(callback):
252252 except Exception :
253253 # 子类可能会重写 get_current_session ,所以不要用 ThreadBasedSession.get_current_session 来调用
254254 self .get_current_session ().on_task_exception ()
255+ # todo: clean up from `register_thread()`
255256
256257 if mutex :
257258 run (callback )
You can’t perform that action at this time.
0 commit comments