Skip to content

Commit ff42ba4

Browse files
committed
test: Extend migration tests to SGLang
Signed-off-by: Jacky <18255193+kthui@users.noreply.github.com>
1 parent 64779d0 commit ff42ba4

File tree

1 file changed

+331
-0
lines changed

1 file changed

+331
-0
lines changed
Lines changed: 331 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,331 @@
1+
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2+
# SPDX-License-Identifier: Apache-2.0
3+
4+
import logging
5+
import os
6+
import shutil
7+
8+
import pytest
9+
10+
from tests.utils.constants import FAULT_TOLERANCE_MODEL_NAME
11+
from tests.utils.engine_process import FRONTEND_PORT
12+
from tests.utils.managed_process import ManagedProcess, terminate_process_tree
13+
from tests.utils.payloads import check_models_api
14+
15+
# Import utilities from the refactored utils module
16+
from .utils import (
17+
DynamoFrontendProcess,
18+
determine_request_receiving_worker,
19+
start_completion_request,
20+
validate_completion_response,
21+
verify_migration_occurred,
22+
)
23+
24+
logger = logging.getLogger(__name__)
25+
26+
27+
class DynamoWorkerProcess(ManagedProcess):
28+
"""Process manager for Dynamo worker with SGLang backend"""
29+
30+
def __init__(self, request, worker_id: str, migration_limit: int = 3):
31+
self.worker_id = worker_id
32+
33+
command = [
34+
"python3",
35+
"-m",
36+
"dynamo.sglang",
37+
"--model-path",
38+
FAULT_TOLERANCE_MODEL_NAME,
39+
"--served-model-name",
40+
FAULT_TOLERANCE_MODEL_NAME,
41+
"--trust-remote-code",
42+
"--skip-tokenizer-init",
43+
"--mem-fraction-static",
44+
"0.45",
45+
"--context-length",
46+
"8192",
47+
"--migration-limit",
48+
str(migration_limit),
49+
]
50+
51+
# Set debug logging environment
52+
env = os.environ.copy()
53+
env["DYN_LOG"] = "debug"
54+
env["DYN_SYSTEM_USE_ENDPOINT_HEALTH_STATUS"] = '["generate"]'
55+
env["DYN_SYSTEM_PORT"] = f"808{worker_id[-1]}"
56+
57+
# TODO: Have the managed process take a command name explicitly to distinguish
58+
# between processes started with the same command.
59+
log_dir = f"{request.node.name}_{worker_id}"
60+
61+
# Clean up any existing log directory from previous runs
62+
try:
63+
shutil.rmtree(log_dir)
64+
logger.info(f"Cleaned up existing log directory: {log_dir}")
65+
except FileNotFoundError:
66+
# Directory doesn't exist, which is fine
67+
pass
68+
69+
super().__init__(
70+
command=command,
71+
env=env,
72+
health_check_urls=[
73+
(f"http://localhost:{FRONTEND_PORT}/v1/models", check_models_api),
74+
(f"http://localhost:808{worker_id[-1]}/health", self.is_ready),
75+
],
76+
timeout=300,
77+
display_output=True,
78+
terminate_existing=False,
79+
stragglers=["SGLANG:EngineCore"],
80+
straggler_commands=["-m dynamo.sglang"],
81+
log_dir=log_dir,
82+
)
83+
84+
def get_pid(self):
85+
"""Get the PID of the worker process"""
86+
return self.proc.pid if self.proc else None
87+
88+
def is_ready(self, response) -> bool:
89+
"""Check the health of the worker process"""
90+
try:
91+
data = response.json()
92+
if data.get("status") == "ready":
93+
logger.info(f"{self.worker_id} status is ready")
94+
return True
95+
logger.warning(
96+
f"{self.worker_id} status is not ready: {data.get('status')}"
97+
)
98+
except ValueError:
99+
logger.warning(f"{self.worker_id} health response is not valid JSON")
100+
return False
101+
102+
103+
@pytest.mark.sglang
104+
@pytest.mark.gpu_1
105+
@pytest.mark.e2e
106+
@pytest.mark.model(FAULT_TOLERANCE_MODEL_NAME)
107+
def test_request_migration_sglang_worker_failure(
108+
request, runtime_services, predownload_models, set_ucx_tls_no_mm
109+
):
110+
"""
111+
End-to-end test for worker fault tolerance with migration support using SGLang.
112+
113+
This test verifies that when a worker is killed during request processing,
114+
the system can handle the failure gracefully and migrate the request to
115+
another worker.
116+
"""
117+
118+
# Step 1: Start the frontend
119+
with DynamoFrontendProcess(request) as frontend:
120+
logger.info("Frontend started successfully")
121+
122+
# Step 2: Start 2 workers sequentially
123+
124+
# Start worker1 first and wait for it to be ready
125+
logger.info("Starting worker 1...")
126+
with DynamoWorkerProcess(request, "worker1") as worker1:
127+
logger.info(f"Worker 1 PID: {worker1.get_pid()}")
128+
129+
with DynamoWorkerProcess(request, "worker2") as worker2:
130+
logger.info(f"Worker 2 PID: {worker2.get_pid()}")
131+
132+
# Step 3: Send the request
133+
request_thread, response_list = start_completion_request()
134+
135+
# Step 4: Use polling to determine which worker received the request
136+
worker, worker_name = determine_request_receiving_worker(
137+
worker1, worker2, receiving_pattern="New Request ID: "
138+
)
139+
140+
# Step 5: Kill the worker that has the request
141+
logger.info(
142+
f"Killing {worker_name} with PID {worker.get_pid()} processing the request"
143+
)
144+
terminate_process_tree(worker.get_pid(), immediate_kill=True, timeout=0)
145+
146+
# Step 6: Validate the completion response
147+
validate_completion_response(request_thread, response_list)
148+
149+
# Step 7: Verify migration occurred
150+
verify_migration_occurred(frontend)
151+
152+
153+
@pytest.mark.sglang
154+
@pytest.mark.gpu_1
155+
@pytest.mark.e2e
156+
@pytest.mark.model(FAULT_TOLERANCE_MODEL_NAME)
157+
def test_request_migration_sglang_graceful_shutdown(
158+
request, runtime_services, predownload_models, set_ucx_tls_no_mm
159+
):
160+
"""
161+
End-to-end test for worker fault tolerance with graceful shutdown and migration support using SGLang.
162+
163+
This test verifies that when a worker receives a graceful shutdown signal (SIGTERM)
164+
during request processing, the system can handle the shutdown gracefully and migrate
165+
the request to another worker. Unlike the abrupt kill test, this simulates a more
166+
controlled shutdown scenario where the worker has time to clean up and notify the
167+
system about its shutdown.
168+
"""
169+
170+
# Step 1: Start the frontend
171+
with DynamoFrontendProcess(request) as frontend:
172+
logger.info("Frontend started successfully")
173+
174+
# Step 2: Start 2 workers sequentially
175+
with DynamoWorkerProcess(request, "worker1") as worker1:
176+
logger.info(f"Worker 1 PID: {worker1.get_pid()}")
177+
178+
with DynamoWorkerProcess(request, "worker2") as worker2:
179+
logger.info(f"Worker 2 PID: {worker2.get_pid()}")
180+
181+
# Step 3: Send the request
182+
request_thread, response_list = start_completion_request()
183+
184+
# Step 4: Use polling to determine which worker received the request
185+
worker, worker_name = determine_request_receiving_worker(
186+
worker1, worker2, receiving_pattern="New Request ID: "
187+
)
188+
189+
# Step 5: Gracefully shutdown the worker that has the request
190+
logger.info(
191+
f"Gracefully shutting down {worker_name} with PID {worker.get_pid()} processing the request"
192+
)
193+
terminate_process_tree(
194+
worker.get_pid(), immediate_kill=False, timeout=10
195+
)
196+
197+
# Step 6: Validate the completion response
198+
validate_completion_response(request_thread, response_list)
199+
200+
# Step 7: Verify migration occurred during graceful shutdown
201+
verify_migration_occurred(frontend)
202+
203+
204+
@pytest.mark.sglang
205+
@pytest.mark.gpu_1
206+
@pytest.mark.e2e
207+
@pytest.mark.model(FAULT_TOLERANCE_MODEL_NAME)
208+
def test_no_request_migration_sglang_worker_failure(
209+
request, runtime_services, predownload_models, set_ucx_tls_no_mm
210+
):
211+
"""
212+
End-to-end test for worker fault tolerance with migration disabled using SGLang.
213+
214+
This test verifies that when migration is disabled (migration_limit=0) and a worker
215+
is killed during request processing, the request fails as expected without migration.
216+
This is the opposite behavior of test_request_migration_sglang_worker_failure.
217+
"""
218+
219+
# Step 1: Start the frontend
220+
with DynamoFrontendProcess(request) as frontend:
221+
logger.info("Frontend started successfully")
222+
223+
# Step 2: Start 2 workers sequentially with migration disabled
224+
logger.info("Starting worker 1 with migration disabled...")
225+
with DynamoWorkerProcess(request, "worker1", migration_limit=0) as worker1:
226+
logger.info(f"Worker 1 PID: {worker1.get_pid()}")
227+
228+
with DynamoWorkerProcess(request, "worker2", migration_limit=0) as worker2:
229+
logger.info(f"Worker 2 PID: {worker2.get_pid()}")
230+
231+
# Step 3: Send the request
232+
request_thread, response_list = start_completion_request()
233+
234+
# Step 4: Use polling to determine which worker received the request
235+
worker, worker_name = determine_request_receiving_worker(
236+
worker1, worker2, receiving_pattern="New Request ID: "
237+
)
238+
239+
# Step 5: Kill the worker that has the request
240+
logger.info(
241+
f"Killing {worker_name} with PID {worker.get_pid()} processing the request"
242+
)
243+
terminate_process_tree(worker.get_pid(), immediate_kill=True, timeout=0)
244+
245+
# Step 6: Validate the completion response - should fail without migration
246+
try:
247+
validate_completion_response(request_thread, response_list)
248+
pytest.fail(
249+
"Request succeeded unexpectedly when migration was disabled"
250+
)
251+
except AssertionError as e:
252+
assert "Request failed with status 500: " in str(
253+
e
254+
), f"Unexpected request error message: {e}"
255+
256+
# Step 7: Verify migration did NOT occur - should fail
257+
try:
258+
verify_migration_occurred(frontend)
259+
pytest.fail(
260+
"Migration verification unexpectedly passed when migration was disabled"
261+
)
262+
except AssertionError as e:
263+
assert "'Cannot recreate stream: ...' error found in logs" in str(
264+
e
265+
), f"Unexpected migration message: {e}"
266+
267+
268+
@pytest.mark.sglang
269+
@pytest.mark.gpu_1
270+
@pytest.mark.e2e
271+
@pytest.mark.model(FAULT_TOLERANCE_MODEL_NAME)
272+
def test_no_request_migration_sglang_graceful_shutdown(
273+
request, runtime_services, predownload_models, set_ucx_tls_no_mm
274+
):
275+
"""
276+
End-to-end test for worker fault tolerance with graceful shutdown and migration disabled using SGLang.
277+
278+
This test verifies that when migration is disabled (migration_limit=0) and a worker
279+
receives a graceful shutdown signal (SIGTERM) during request processing, the request
280+
fails as expected without migration. This is the opposite behavior of
281+
test_request_migration_sglang_graceful_shutdown.
282+
"""
283+
284+
# Step 1: Start the frontend
285+
with DynamoFrontendProcess(request) as frontend:
286+
logger.info("Frontend started successfully")
287+
288+
# Step 2: Start 2 workers sequentially with migration disabled
289+
with DynamoWorkerProcess(request, "worker1", migration_limit=0) as worker1:
290+
logger.info(f"Worker 1 PID: {worker1.get_pid()}")
291+
292+
with DynamoWorkerProcess(request, "worker2", migration_limit=0) as worker2:
293+
logger.info(f"Worker 2 PID: {worker2.get_pid()}")
294+
295+
# Step 3: Send the request
296+
request_thread, response_list = start_completion_request()
297+
298+
# Step 4: Use polling to determine which worker received the request
299+
worker, worker_name = determine_request_receiving_worker(
300+
worker1, worker2, receiving_pattern="New Request ID: "
301+
)
302+
303+
# Step 5: Gracefully shutdown the worker that has the request
304+
logger.info(
305+
f"Gracefully shutting down {worker_name} with PID {worker.get_pid()} processing the request"
306+
)
307+
terminate_process_tree(
308+
worker.get_pid(), immediate_kill=False, timeout=10
309+
)
310+
311+
# Step 6: Validate the completion response - should fail without migration
312+
try:
313+
validate_completion_response(request_thread, response_list)
314+
pytest.fail(
315+
"Request succeeded unexpectedly when migration was disabled"
316+
)
317+
except AssertionError as e:
318+
assert "Request failed with status 500: " in str(
319+
e
320+
), f"Unexpected request error message: {e}"
321+
322+
# Step 7: Verify migration did NOT occur - should fail
323+
try:
324+
verify_migration_occurred(frontend)
325+
pytest.fail(
326+
"Migration verification unexpectedly passed when migration was disabled"
327+
)
328+
except AssertionError as e:
329+
assert "'Cannot recreate stream: ...' error found in logs" in str(
330+
e
331+
), f"Unexpected migration message: {e}"

0 commit comments

Comments
 (0)