11# pylint: disable=invalid-name
22import random
33import string
4+ import threading
45import uuid
56import warnings
67from dataclasses import asdict
78from datetime import datetime , timezone
9+ from enum import IntEnum
810from typing import Any , Callable , Dict , Optional
911
1012from apscheduler .executors .pool import ThreadPoolExecutor
1113from apscheduler .job import Job
14+ from apscheduler .jobstores .base import JobLookupError
1215from apscheduler .schedulers .background import BackgroundScheduler
1316from apscheduler .schedulers .base import STATE_RUNNING , BaseScheduler
1417from apscheduler .triggers .interval import IntervalTrigger
6265]
6366
6467
68+ class _RunState (IntEnum ):
69+ UNINITIALIZED = 0
70+ INITIALIZED = 1
71+ SHUTDOWN = 2
72+
73+
6574class ExperimentalMode (TypedDict , total = False ):
6675 type : Literal ["streaming" , "polling" ]
6776
@@ -187,6 +196,8 @@ def __init__(
187196 self .unleash_event_callback = event_callback
188197 self ._ready_callback = build_ready_callback (event_callback )
189198 self .connector_mode : ExperimentalMode = experimental_mode or {"type" : "polling" }
199+ self ._lifecycle_lock = threading .RLock ()
200+ self ._closed = threading .Event ()
190201
191202 self ._do_instance_check (multiple_instance_mode )
192203
@@ -211,7 +222,7 @@ def __init__(
211222 self .strategy_mapping = {** custom_strategies }
212223
213224 # Client status
214- self .is_initialized = False
225+ self ._run_state = _RunState . UNINITIALIZED
215226
216227 # Bootstrapping
217228 if self .unleash_bootstrapped :
@@ -258,6 +269,10 @@ def unleash_metrics_interval_str_millis(self) -> str:
258269 def connection_id (self ):
259270 return self ._connection_id
260271
272+ @property
273+ def is_initialized (self ):
274+ return self ._run_state == _RunState .INITIALIZED
275+
261276 def initialize_client (self , fetch_toggles : bool = True ) -> None :
262277 """
263278 Initializes client and starts communication with central unleash server(s).
@@ -285,8 +300,12 @@ def initialize_client(self, fetch_toggles: bool = True) -> None:
285300 pass
286301 """
287302 # Only perform initialization steps if client is not initialized.
288- if not self .is_initialized :
289- # pylint: disable=no-else-raise
303+ with self ._lifecycle_lock :
304+ if self ._closed .is_set () or self ._run_state > _RunState .UNINITIALIZED :
305+ warnings .warn (
306+ "Attempted to initialize an Unleash Client instance that has already been initialized."
307+ )
308+ return
290309 try :
291310 start_scheduler = False
292311 base_headers = {
@@ -388,20 +407,14 @@ def initialize_client(self, fetch_toggles: bool = True) -> None:
388407
389408 if start_scheduler :
390409 self .unleash_scheduler .start ()
410+ self ._run_state = _RunState .INITIALIZED
391411
392412 except Exception as excep :
393413 # Log exceptions during initialization. is_initialized will remain false.
394414 LOGGER .warning (
395415 "Exception during UnleashClient initialization: %s" , excep
396416 )
397417 raise excep
398- else :
399- # Set is_initialized to true if no exception is encountered.
400- self .is_initialized = True
401- else :
402- warnings .warn (
403- "Attempted to initialize an Unleash Client instance that has already been initialized."
404- )
405418
406419 def feature_definitions (self ) -> dict :
407420 """
@@ -431,34 +444,42 @@ def destroy(self) -> None:
431444
432445 You shouldn't need this too much!
433446 """
434- if self .connector :
435- self .connector .stop ()
436- if self .metric_job :
437- self .metric_job .remove ()
438-
439- # Flush metrics before shutting down.
440- aggregate_and_send_metrics (
441- url = self .unleash_url ,
442- app_name = self .unleash_app_name ,
443- connection_id = self .connection_id ,
444- instance_id = self .unleash_instance_id ,
445- headers = self .metrics_headers ,
446- custom_options = self .unleash_custom_options ,
447- request_timeout = self .unleash_request_timeout ,
448- engine = self .engine ,
449- )
447+ with self ._lifecycle_lock :
448+ if self ._closed .is_set ():
449+ return
450+ self ._closed .set ()
451+ self ._run_state = _RunState .SHUTDOWN
452+ if self .connector :
453+ self .connector .stop ()
454+
455+ if self .metric_job :
456+ # Flush metrics before shutting down.
457+ aggregate_and_send_metrics (
458+ url = self .unleash_url ,
459+ app_name = self .unleash_app_name ,
460+ connection_id = self .connection_id ,
461+ instance_id = self .unleash_instance_id ,
462+ headers = self .metrics_headers ,
463+ custom_options = self .unleash_custom_options ,
464+ request_timeout = self .unleash_request_timeout ,
465+ engine = self .engine ,
466+ )
467+ try :
468+ self .metric_job .remove ()
469+ except JobLookupError as exc :
470+ LOGGER .info ("Exception during connector teardown: %s" , exc )
450471
451- try :
452- if hasattr (self , "unleash_scheduler" ) and self .unleash_scheduler :
453- self .unleash_scheduler .remove_all_jobs ()
454- self .unleash_scheduler .shutdown (wait = True )
455- except Exception as exc :
456- LOGGER .warning ("Exception during scheduler teardown: %s" , exc )
472+ try :
473+ if hasattr (self , "unleash_scheduler" ) and self .unleash_scheduler :
474+ self .unleash_scheduler .remove_all_jobs ()
475+ self .unleash_scheduler .shutdown (wait = True )
476+ except Exception as exc :
477+ LOGGER .warning ("Exception during scheduler teardown: %s" , exc )
457478
458- try :
459- self .cache .destroy ()
460- except Exception as exc :
461- LOGGER .warning ("Exception during cache teardown: %s" , exc )
479+ try :
480+ self .cache .destroy ()
481+ except Exception as exc :
482+ LOGGER .warning ("Exception during cache teardown: %s" , exc )
462483
463484 @staticmethod
464485 def _get_fallback_value (
0 commit comments