11import json
22import os
33import pytest
4+ import shutil
5+ import uuid
46
57import ray
68
1012from tests .conftest import TestTransport
1113
1214
15+ @pytest .fixture (autouse = True )
16+ def shutdown_ray (tmpdir ):
17+ yield
18+ ray .shutdown ()
19+
20+
1321class RayTestTransport (TestTransport ):
1422 def __init__ (self ):
1523 self .envelopes = []
@@ -20,9 +28,6 @@ def capture_envelope(self, envelope: Envelope) -> None:
2028
2129
2230class RayLoggingTransport (TestTransport ):
23- def __init__ (self ):
24- super ().__init__ ()
25-
2631 def capture_envelope (self , envelope : Envelope ) -> None :
2732 print (envelope .serialize ().decode ("utf-8" , "replace" ))
2833
@@ -39,13 +44,24 @@ def setup_sentry(transport=None):
3944 )
4045
4146
42- def read_error_from_log (job_id ):
43- log_dir = "/tmp/ray/session_latest/logs/"
47+ def read_error_from_log (job_id , ray_temp_dir ):
48+ # Find the actual session directory that Ray created
49+ session_dirs = [d for d in os .listdir (ray_temp_dir ) if d .startswith ("session_" )]
50+ if not session_dirs :
51+ raise FileNotFoundError (f"No session directory found in { ray_temp_dir } " )
52+
53+ session_dir = os .path .join (ray_temp_dir , session_dirs [0 ])
54+ log_dir = os .path .join (session_dir , "logs" )
55+
56+ if not os .path .exists (log_dir ):
57+ raise FileNotFoundError (f"No logs directory found at { log_dir } " )
58+
4459 log_file = [
4560 f
4661 for f in os .listdir (log_dir )
4762 if "worker" in f and job_id in f and f .endswith (".out" )
4863 ][0 ]
64+
4965 with open (os .path .join (log_dir , log_file ), "r" ) as file :
5066 lines = file .readlines ()
5167
@@ -58,7 +74,6 @@ def read_error_from_log(job_id):
5874 return error
5975
6076
61- @pytest .mark .forked
6277@pytest .mark .parametrize (
6378 "task_options" , [{}, {"num_cpus" : 0 , "memory" : 1024 * 1024 * 10 }]
6479)
@@ -124,40 +139,47 @@ def example_task():
124139 )
125140
126141
127- @pytest .mark .forked
128142def test_errors_in_ray_tasks ():
129143 setup_sentry_with_logging_transport ()
130144
131- ray .init (
132- runtime_env = {
133- "worker_process_setup_hook" : setup_sentry_with_logging_transport ,
134- "working_dir" : "./" ,
135- }
136- )
145+ ray_temp_dir = os .path .join ("/tmp" , f"ray_test_{ uuid .uuid4 ().hex [:8 ]} " )
146+ os .makedirs (ray_temp_dir , exist_ok = True )
137147
138- # Setup ray task
139- @ray .remote
140- def example_task ():
141- 1 / 0
148+ try :
149+ ray .init (
150+ runtime_env = {
151+ "worker_process_setup_hook" : setup_sentry_with_logging_transport ,
152+ "working_dir" : "./" ,
153+ },
154+ _temp_dir = ray_temp_dir ,
155+ )
142156
143- with sentry_sdk . start_transaction ( op = "task" , name = "ray test transaction" ):
144- with pytest . raises ( ZeroDivisionError ):
145- future = example_task . remote ()
146- ray . get ( future )
157+ # Setup ray task
158+ @ ray . remote
159+ def example_task ():
160+ 1 / 0
147161
148- job_id = future .job_id ().hex ()
149- error = read_error_from_log (job_id )
162+ with sentry_sdk .start_transaction (op = "task" , name = "ray test transaction" ):
163+ with pytest .raises (ZeroDivisionError ):
164+ future = example_task .remote ()
165+ ray .get (future )
150166
151- assert error ["level" ] == "error"
152- assert (
153- error ["transaction" ]
154- == "tests.integrations.ray.test_ray.test_errors_in_ray_tasks.<locals>.example_task"
155- )
156- assert error ["exception" ]["values" ][0 ]["mechanism" ]["type" ] == "ray"
157- assert not error ["exception" ]["values" ][0 ]["mechanism" ]["handled" ]
167+ job_id = future .job_id ().hex ()
168+ error = read_error_from_log (job_id , ray_temp_dir )
169+
170+ assert error ["level" ] == "error"
171+ assert (
172+ error ["transaction" ]
173+ == "tests.integrations.ray.test_ray.test_errors_in_ray_tasks.<locals>.example_task"
174+ )
175+ assert error ["exception" ]["values" ][0 ]["mechanism" ]["type" ] == "ray"
176+ assert not error ["exception" ]["values" ][0 ]["mechanism" ]["handled" ]
177+
178+ finally :
179+ if os .path .exists (ray_temp_dir ):
180+ shutil .rmtree (ray_temp_dir , ignore_errors = True )
158181
159182
160- @pytest .mark .forked
161183def test_tracing_in_ray_actors ():
162184 setup_sentry ()
163185
@@ -194,37 +216,45 @@ def increment(self):
194216 assert worker_envelopes == []
195217
196218
197- @pytest .mark .forked
198219def test_errors_in_ray_actors ():
199220 setup_sentry_with_logging_transport ()
200221
201- ray .init (
202- runtime_env = {
203- "worker_process_setup_hook" : setup_sentry_with_logging_transport ,
204- "working_dir" : "./" ,
205- }
206- )
207-
208- # Setup ray actor
209- @ray .remote
210- class Counter :
211- def __init__ (self ):
212- self .n = 0
213-
214- def increment (self ):
215- with sentry_sdk .start_span (op = "task" , name = "example actor execution" ):
216- 1 / 0
217-
218- return sentry_sdk .get_client ().transport .envelopes
219-
220- with sentry_sdk .start_transaction (op = "task" , name = "ray test transaction" ):
221- with pytest .raises (ZeroDivisionError ):
222- counter = Counter .remote ()
223- future = counter .increment .remote ()
224- ray .get (future )
225-
226- job_id = future .job_id ().hex ()
227- error = read_error_from_log (job_id )
228-
229- # We do not capture errors in ray actors yet
230- assert error is None
222+ ray_temp_dir = os .path .join ("/tmp" , f"ray_test_{ uuid .uuid4 ().hex [:8 ]} " )
223+ os .makedirs (ray_temp_dir , exist_ok = True )
224+
225+ try :
226+ ray .init (
227+ runtime_env = {
228+ "worker_process_setup_hook" : setup_sentry_with_logging_transport ,
229+ "working_dir" : "./" ,
230+ },
231+ _temp_dir = ray_temp_dir ,
232+ )
233+
234+ # Setup ray actor
235+ @ray .remote
236+ class Counter :
237+ def __init__ (self ):
238+ self .n = 0
239+
240+ def increment (self ):
241+ with sentry_sdk .start_span (op = "task" , name = "example actor execution" ):
242+ 1 / 0
243+
244+ return sentry_sdk .get_client ().transport .envelopes
245+
246+ with sentry_sdk .start_transaction (op = "task" , name = "ray test transaction" ):
247+ with pytest .raises (ZeroDivisionError ):
248+ counter = Counter .remote ()
249+ future = counter .increment .remote ()
250+ ray .get (future )
251+
252+ job_id = future .job_id ().hex ()
253+ error = read_error_from_log (job_id , ray_temp_dir )
254+
255+ # We do not capture errors in ray actors yet
256+ assert error is None
257+
258+ finally :
259+ if os .path .exists (ray_temp_dir ):
260+ shutil .rmtree (ray_temp_dir , ignore_errors = True )
0 commit comments