1- import os
21import io
2+ import os
3+ import random
34import re
45import sys
56import threading
6- import random
77import time
88import zlib
9+ from contextlib import contextmanager
910from datetime import datetime
1011from functools import wraps , partial
11- from threading import Event , Lock , Thread
12- from contextlib import contextmanager
1312
1413import sentry_sdk
15- from sentry_sdk ._compat import text_type , utc_from_timestamp , iteritems
14+ from sentry_sdk ._compat import PY2 , text_type , utc_from_timestamp , iteritems
1615from sentry_sdk .utils import (
16+ ContextVar ,
1717 now ,
1818 nanosecond_time ,
1919 to_timestamp ,
2020 serialize_frame ,
2121 json_dumps ,
22+ is_gevent ,
2223)
2324from sentry_sdk .envelope import Envelope , Item
2425from sentry_sdk .tracing import (
5354 from sentry_sdk ._types import MetricValue
5455
5556
56- _thread_local = threading .local ()
57+ try :
58+ from gevent .monkey import get_original # type: ignore
59+ from gevent .threadpool import ThreadPool # type: ignore
60+ except ImportError :
61+ import importlib
62+
63+ def get_original (module , name ):
64+ # type: (str, str) -> Any
65+ return getattr (importlib .import_module (module ), name )
66+
67+
68+ _in_metrics = ContextVar ("in_metrics" )
5769_sanitize_key = partial (re .compile (r"[^a-zA-Z0-9_/.-]+" ).sub , "_" )
5870_sanitize_value = partial (re .compile (r"[^\w\d_:/@\.{}\[\]$-]+" , re .UNICODE ).sub , "_" )
5971_set = set # set is shadowed below
@@ -84,15 +96,12 @@ def get_code_location(stacklevel):
8496def recursion_protection ():
8597 # type: () -> Generator[bool, None, None]
8698 """Enters recursion protection and returns the old flag."""
99+ old_in_metrics = _in_metrics .get (False )
100+ _in_metrics .set (True )
87101 try :
88- in_metrics = _thread_local .in_metrics
89- except AttributeError :
90- in_metrics = False
91- _thread_local .in_metrics = True
92- try :
93- yield in_metrics
102+ yield old_in_metrics
94103 finally :
95- _thread_local . in_metrics = in_metrics
104+ _in_metrics . set ( old_in_metrics )
96105
97106
98107def metrics_noop (func ):
@@ -411,20 +420,30 @@ def __init__(
411420 self ._pending_locations = {} # type: Dict[int, List[Tuple[MetricMetaKey, Any]]]
412421 self ._buckets_total_weight = 0
413422 self ._capture_func = capture_func
414- self ._lock = Lock ()
415423 self ._running = True
416- self ._flush_event = Event ()
424+ self ._lock = threading .Lock ()
425+
426+ if is_gevent () and PY2 :
427+ # get_original on threading.Event in Python 2 incorrectly returns
428+ # the gevent-patched class. Luckily, threading.Event is just an alias
429+ # for threading._Event in Python 2, and get_original on
430+ # threading._Event correctly gets us the stdlib original.
431+ event_cls = get_original ("threading" , "_Event" )
432+ else :
433+ event_cls = get_original ("threading" , "Event" )
434+ self ._flush_event = event_cls () # type: threading.Event
435+
417436 self ._force_flush = False
418437
419- # The aggregator shifts it's flushing by up to an entire rollup window to
438+ # The aggregator shifts its flushing by up to an entire rollup window to
420439 # avoid multiple clients trampling on end of a 10 second window as all the
421440 # buckets are anchored to multiples of ROLLUP seconds. We randomize this
422441 # number once per aggregator boot to achieve some level of offsetting
423442 # across a fleet of deployed SDKs. Relay itself will also apply independent
424443 # jittering.
425444 self ._flush_shift = random .random () * self .ROLLUP_IN_SECONDS
426445
427- self ._flusher = None # type: Optional[Thread]
446+ self ._flusher = None # type: Optional[Union[threading. Thread, ThreadPool] ]
428447 self ._flusher_pid = None # type: Optional[int]
429448 self ._ensure_thread ()
430449
@@ -435,25 +454,35 @@ def _ensure_thread(self):
435454 """
436455 if not self ._running :
437456 return False
457+
438458 pid = os .getpid ()
439459 if self ._flusher_pid == pid :
440460 return True
461+
441462 with self ._lock :
442463 self ._flusher_pid = pid
443- self ._flusher = Thread (target = self ._flush_loop )
444- self ._flusher .daemon = True
464+
465+ if not is_gevent ():
466+ self ._flusher = threading .Thread (target = self ._flush_loop )
467+ self ._flusher .daemon = True
468+ start_flusher = self ._flusher .start
469+ else :
470+ self ._flusher = ThreadPool (1 )
471+ start_flusher = partial (self ._flusher .spawn , func = self ._flush_loop )
472+
445473 try :
446- self . _flusher . start ()
474+ start_flusher ()
447475 except RuntimeError :
448476 # Unfortunately at this point the interpreter is in a state that no
449477 # longer allows us to spawn a thread and we have to bail.
450478 self ._running = False
451479 return False
480+
452481 return True
453482
454483 def _flush_loop (self ):
455484 # type: (...) -> None
456- _thread_local . in_metrics = True
485+ _in_metrics . set ( True )
457486 while self ._running or self ._force_flush :
458487 self ._flush ()
459488 if self ._running :
@@ -608,7 +637,6 @@ def kill(self):
608637
609638 self ._running = False
610639 self ._flush_event .set ()
611- self ._flusher .join ()
612640 self ._flusher = None
613641
614642 @metrics_noop
0 commit comments