@@ -547,6 +547,16 @@ def _handle_undeliverable_message(
547547
548548@pytest .mark .timeout (60 )
549549async def test_actor_log_streaming () -> None :
550+ old_env = {}
551+ env_vars = {
552+ "HYPERACTOR_MESH_ENABLE_LOG_FORWARDING" : "true" ,
553+ "HYPERACTOR_MESH_ENABLE_FILE_CAPTURE" : "true" ,
554+ "HYPERACTOR_MESH_TAIL_LOG_LINES" : "100" ,
555+ }
556+ for key , value in env_vars .items ():
557+ old_env [key ] = os .environ .get (key )
558+ os .environ [key ] = value
559+
550560 # Save original file descriptors
551561 original_stdout_fd = os .dup (1 ) # stdout
552562 original_stderr_fd = os .dup (2 ) # stderr
@@ -684,6 +694,12 @@ async def test_actor_log_streaming() -> None:
684694 ), stderr_content
685695
686696 finally :
697+ for key , value in old_env .items ():
698+ if value is None :
699+ os .environ .pop (key , None )
700+ else :
701+ os .environ [key ] = value
702+
687703 # Ensure file descriptors are restored even if something goes wrong
688704 try :
689705 os .dup2 (original_stdout_fd , 1 )
@@ -699,6 +715,16 @@ async def test_alloc_based_log_streaming() -> None:
699715 """Test both AllocHandle.stream_logs = False and True cases."""
700716
701717 async def test_stream_logs_case (stream_logs : bool , test_name : str ) -> None :
718+ old_env = {}
719+ env_vars = {
720+ "HYPERACTOR_MESH_ENABLE_LOG_FORWARDING" : "true" ,
721+ "HYPERACTOR_MESH_ENABLE_FILE_CAPTURE" : "true" ,
722+ "HYPERACTOR_MESH_TAIL_LOG_LINES" : "100" ,
723+ }
724+ for key , value in env_vars .items ():
725+ old_env [key ] = os .environ .get (key )
726+ os .environ [key ] = value
727+
702728 # Save original file descriptors
703729 original_stdout_fd = os .dup (1 ) # stdout
704730
@@ -778,6 +804,11 @@ def _stream_logs(self) -> bool:
778804 ), f"stream_logs=True case: { stdout_content } "
779805
780806 finally :
807+ for key , value in old_env .items ():
808+ if value is None :
809+ os .environ .pop (key , None )
810+ else :
811+ os .environ [key ] = value
781812 # Ensure file descriptors are restored even if something goes wrong
782813 try :
783814 os .dup2 (original_stdout_fd , 1 )
@@ -792,6 +823,16 @@ def _stream_logs(self) -> bool:
792823
793824@pytest .mark .timeout (60 )
794825async def test_logging_option_defaults () -> None :
826+ old_env = {}
827+ env_vars = {
828+ "HYPERACTOR_MESH_ENABLE_LOG_FORWARDING" : "true" ,
829+ "HYPERACTOR_MESH_ENABLE_FILE_CAPTURE" : "true" ,
830+ "HYPERACTOR_MESH_TAIL_LOG_LINES" : "100" ,
831+ }
832+ for key , value in env_vars .items ():
833+ old_env [key ] = os .environ .get (key )
834+ os .environ [key ] = value
835+
795836 # Save original file descriptors
796837 original_stdout_fd = os .dup (1 ) # stdout
797838 original_stderr_fd = os .dup (2 ) # stderr
@@ -870,6 +911,12 @@ async def test_logging_option_defaults() -> None:
870911 ), stderr_content
871912
872913 finally :
914+ for key , value in old_env .items ():
915+ if value is None :
916+ os .environ .pop (key , None )
917+ else :
918+ os .environ [key ] = value
919+
873920 # Ensure file descriptors are restored even if something goes wrong
874921 try :
875922 os .dup2 (original_stdout_fd , 1 )
@@ -906,6 +953,15 @@ def __init__(self):
906953@pytest .mark .oss_skip
907954async def test_flush_called_only_once () -> None :
908955 """Test that flush is called only once when ending an ipython cell"""
956+ old_env = {}
957+ env_vars = {
958+ "HYPERACTOR_MESH_ENABLE_LOG_FORWARDING" : "true" ,
959+ "HYPERACTOR_MESH_ENABLE_FILE_CAPTURE" : "true" ,
960+ "HYPERACTOR_MESH_TAIL_LOG_LINES" : "100" ,
961+ }
962+ for key , value in env_vars .items ():
963+ old_env [key ] = os .environ .get (key )
964+ os .environ [key ] = value
909965 mock_ipython = MockIPython ()
910966 with unittest .mock .patch (
911967 "monarch._src.actor.logging.get_ipython" ,
@@ -926,14 +982,29 @@ async def test_flush_called_only_once() -> None:
926982
927983 # now, flush should be called only once
928984 mock_ipython .events .trigger ("post_run_cell" , unittest .mock .MagicMock ())
985+
929986 assert mock_flush .call_count == 1
987+ for key , value in old_env .items ():
988+ if value is None :
989+ os .environ .pop (key , None )
990+ else :
991+ os .environ [key ] = value
930992
931993
932994# oss_skip: pytest keeps complaining about mocking get_ipython module
933995@pytest .mark .oss_skip
934996@pytest .mark .timeout (180 )
935997async def test_flush_logs_ipython () -> None :
936998 """Test that logs are flushed when get_ipython is available and post_run_cell event is triggered."""
999+ old_env = {}
1000+ env_vars = {
1001+ "HYPERACTOR_MESH_ENABLE_LOG_FORWARDING" : "true" ,
1002+ "HYPERACTOR_MESH_ENABLE_FILE_CAPTURE" : "true" ,
1003+ "HYPERACTOR_MESH_TAIL_LOG_LINES" : "100" ,
1004+ }
1005+ for key , value in env_vars .items ():
1006+ old_env [key ] = os .environ .get (key )
1007+ os .environ [key ] = value
9371008 # Save original file descriptors
9381009 original_stdout_fd = os .dup (1 ) # stdout
9391010
@@ -1025,6 +1096,11 @@ async def test_flush_logs_ipython() -> None:
10251096 ), stdout_content
10261097
10271098 finally :
1099+ for key , value in old_env .items ():
1100+ if value is None :
1101+ os .environ .pop (key , None )
1102+ else :
1103+ os .environ [key ] = value
10281104 # Ensure file descriptors are restored even if something goes wrong
10291105 try :
10301106 os .dup2 (original_stdout_fd , 1 )
@@ -1036,6 +1112,15 @@ async def test_flush_logs_ipython() -> None:
10361112# oss_skip: importlib not pulling resource correctly in git CI, needs to be revisited
10371113@pytest .mark .oss_skip
10381114async def test_flush_logs_fast_exit () -> None :
1115+ old_env = {}
1116+ env_vars = {
1117+ "HYPERACTOR_MESH_ENABLE_LOG_FORWARDING" : "true" ,
1118+ "HYPERACTOR_MESH_ENABLE_FILE_CAPTURE" : "true" ,
1119+ "HYPERACTOR_MESH_TAIL_LOG_LINES" : "100" ,
1120+ }
1121+ for key , value in env_vars .items ():
1122+ old_env [key ] = os .environ .get (key )
1123+ os .environ [key ] = value
10391124 # We use a subprocess to run the test so we can handle the flushed logs at the end.
10401125 # Otherwise, it is hard to restore the original stdout/stderr.
10411126
@@ -1062,13 +1147,29 @@ async def test_flush_logs_fast_exit() -> None:
10621147 == 1
10631148 ), process .stdout
10641149
1150+ for key , value in old_env .items ():
1151+ if value is None :
1152+ os .environ .pop (key , None )
1153+ else :
1154+ os .environ [key ] = value
1155+
10651156
10661157@pytest .mark .timeout (60 )
10671158async def test_flush_on_disable_aggregation () -> None :
10681159 """Test that logs are flushed when disabling aggregation.
10691160
10701161 This tests the corner case: "Make sure we flush whatever in the aggregators before disabling aggregation."
10711162 """
1163+ old_env = {}
1164+ env_vars = {
1165+ "HYPERACTOR_MESH_ENABLE_LOG_FORWARDING" : "true" ,
1166+ "HYPERACTOR_MESH_ENABLE_FILE_CAPTURE" : "true" ,
1167+ "HYPERACTOR_MESH_TAIL_LOG_LINES" : "100" ,
1168+ }
1169+ for key , value in env_vars .items ():
1170+ old_env [key ] = os .environ .get (key )
1171+ os .environ [key ] = value
1172+
10721173 # Save original file descriptors
10731174 original_stdout_fd = os .dup (1 ) # stdout
10741175
@@ -1148,6 +1249,12 @@ async def test_flush_on_disable_aggregation() -> None:
11481249 ), f"Expected 10 single log lines, got { total_single } from { stdout_content } "
11491250
11501251 finally :
1252+ for key , value in old_env .items ():
1253+ if value is None :
1254+ os .environ .pop (key , None )
1255+ else :
1256+ os .environ [key ] = value
1257+
11511258 # Ensure file descriptors are restored even if something goes wrong
11521259 try :
11531260 os .dup2 (original_stdout_fd , 1 )
@@ -1163,6 +1270,15 @@ async def test_multiple_ongoing_flushes_no_deadlock() -> None:
11631270 Because now a flush call is purely sync, it is very easy to get into a deadlock.
11641271 So we assert the last flush call will not get into such a state.
11651272 """
1273+ old_env = {}
1274+ env_vars = {
1275+ "HYPERACTOR_MESH_ENABLE_LOG_FORWARDING" : "true" ,
1276+ "HYPERACTOR_MESH_ENABLE_FILE_CAPTURE" : "true" ,
1277+ "HYPERACTOR_MESH_TAIL_LOG_LINES" : "100" ,
1278+ }
1279+ for key , value in env_vars .items ():
1280+ old_env [key ] = os .environ .get (key )
1281+ os .environ [key ] = value
11661282 pm = this_host ().spawn_procs (per_host = {"gpus" : 4 })
11671283 am = pm .spawn ("printer" , Printer )
11681284
@@ -1185,13 +1301,29 @@ async def test_multiple_ongoing_flushes_no_deadlock() -> None:
11851301 # The last flush should not block
11861302 futures [- 1 ].get ()
11871303
1304+ for key , value in old_env .items ():
1305+ if value is None :
1306+ os .environ .pop (key , None )
1307+ else :
1308+ os .environ [key ] = value
1309+
11881310
11891311@pytest .mark .timeout (60 )
11901312async def test_adjust_aggregation_window () -> None :
11911313 """Test that the flush deadline is updated when the aggregation window is adjusted.
11921314
11931315 This tests the corner case: "This can happen if the user has adjusted the aggregation window."
11941316 """
1317+ old_env = {}
1318+ env_vars = {
1319+ "HYPERACTOR_MESH_ENABLE_LOG_FORWARDING" : "true" ,
1320+ "HYPERACTOR_MESH_ENABLE_FILE_CAPTURE" : "true" ,
1321+ "HYPERACTOR_MESH_TAIL_LOG_LINES" : "100" ,
1322+ }
1323+ for key , value in env_vars .items ():
1324+ old_env [key ] = os .environ .get (key )
1325+ os .environ [key ] = value
1326+
11951327 # Save original file descriptors
11961328 original_stdout_fd = os .dup (1 ) # stdout
11971329
@@ -1258,6 +1390,12 @@ async def test_adjust_aggregation_window() -> None:
12581390 ), stdout_content
12591391
12601392 finally :
1393+ for key , value in old_env .items ():
1394+ if value is None :
1395+ os .environ .pop (key , None )
1396+ else :
1397+ os .environ [key ] = value
1398+
12611399 # Ensure file descriptors are restored even if something goes wrong
12621400 try :
12631401 os .dup2 (original_stdout_fd , 1 )
0 commit comments