88from functools import wraps
99from typing import Any , Dict , Generator , List , Optional , Tuple
1010
11+ from .. import utils
1112from ..services import data_streamer
1213from . import enums , steps , traces
1314
1415logger = logging .getLogger (__name__ )
1516
17+ _publish = utils .get_env_variable ("PUBLISH" ) == "true"
1618_streamer = None
17- try :
18- _streamer = data_streamer .DataStreamer (publish = True )
19- # pylint: disable=broad-except
20- except Exception as exc :
21- logger .error (
22- "You have not provided enough information to upload traces to Openlayer."
23- "\n %s \n "
24- "To upload the traces, please provide the missing information and try again." ,
25- exc ,
26- )
19+ if _publish :
20+ _streamer = data_streamer .DataStreamer ()
2721
2822_current_step = contextvars .ContextVar ("current_step" )
2923_current_trace = contextvars .ContextVar ("current_trace" )
3024
3125
26+ def get_current_trace () -> Optional [traces .Trace ]:
27+ """Returns the current trace."""
28+ return _current_trace .get (None )
29+
30+
31+ def get_current_step () -> Optional [steps .Step ]:
32+ """Returns the current step."""
33+ return _current_step .get (None )
34+
35+
3236@contextmanager
3337def create_step (
3438 name : str ,
@@ -43,7 +47,7 @@ def create_step(
4347 )
4448 new_step .start_time = time .time ()
4549
46- parent_step : Optional [steps .Step ] = _current_step . get ( None )
50+ parent_step : Optional [steps .Step ] = get_current_step ( )
4751 is_root_step : bool = parent_step is None
4852
4953 if parent_step is None :
@@ -53,7 +57,7 @@ def create_step(
5357 current_trace .add_step (new_step )
5458 else :
5559 logger .debug ("Adding step %s to parent step %s" , name , parent_step .name )
56- current_trace = _current_trace . get ()
60+ current_trace = get_current_trace ()
5761 parent_step .add_nested_step (new_step )
5862
5963 token = _current_step .set (new_step )
@@ -86,14 +90,11 @@ def create_step(
8690 "prompt" : new_step .inputs .get ("prompt" ),
8791 }
8892 )
89- if _streamer :
90- _streamer .stream_data (data = trace_data , config = config )
91- else :
92- logger .warning (
93- "Trace computed but not uploaded to Openlayer. "
94- "You have not provided enough information to upload traces to"
95- " Openlayer."
96- )
93+ if _publish :
94+ try :
95+ _streamer .stream_data (data = trace_data , config = config )
96+ except Exception as _ :
97+ logger .error ("Could not stream data to Openlayer" )
9798 else :
9899 logger .debug ("Ending step %s" , name )
99100
0 commit comments