-
Notifications
You must be signed in to change notification settings - Fork 723
test: Extend Migration Tests to SGLang #4711
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
+322
−1
Merged
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,321 @@ | ||
| # SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. | ||
| # SPDX-License-Identifier: Apache-2.0 | ||
|
|
||
| import logging | ||
| import os | ||
| import shutil | ||
|
|
||
| import pytest | ||
|
|
||
| from tests.utils.constants import FAULT_TOLERANCE_MODEL_NAME | ||
| from tests.utils.engine_process import FRONTEND_PORT | ||
| from tests.utils.managed_process import ManagedProcess, terminate_process_tree | ||
| from tests.utils.payloads import check_models_api | ||
|
|
||
| # Import utilities from the refactored utils module | ||
| from .utils import ( | ||
| DynamoFrontendProcess, | ||
| determine_request_receiving_worker, | ||
| start_completion_request, | ||
| validate_completion_response, | ||
| verify_migration_occurred, | ||
| ) | ||
|
|
||
| logger = logging.getLogger(__name__) | ||
|
|
||
| pytestmark = [ | ||
| pytest.mark.sglang, | ||
| pytest.mark.gpu_1, | ||
| pytest.mark.e2e, | ||
| pytest.mark.model(FAULT_TOLERANCE_MODEL_NAME), | ||
| pytest.mark.pre_merge, # can be moved to nightly once stable for a week | ||
| ] | ||
|
|
||
|
|
||
| class DynamoWorkerProcess(ManagedProcess): | ||
| """Process manager for Dynamo worker with SGLang backend""" | ||
|
|
||
| def __init__(self, request, worker_id: str, migration_limit: int = 3): | ||
| self.worker_id = worker_id | ||
|
|
||
| command = [ | ||
| "python3", | ||
| "-m", | ||
| "dynamo.sglang", | ||
| "--model-path", | ||
| FAULT_TOLERANCE_MODEL_NAME, | ||
| "--served-model-name", | ||
| FAULT_TOLERANCE_MODEL_NAME, | ||
| "--trust-remote-code", | ||
| "--skip-tokenizer-init", | ||
| "--mem-fraction-static", | ||
| "0.45", | ||
| "--context-length", | ||
| "8192", | ||
| "--migration-limit", | ||
| str(migration_limit), | ||
| ] | ||
|
|
||
| # Set debug logging environment | ||
| env = os.environ.copy() | ||
| env["DYN_LOG"] = "debug" | ||
| env["DYN_SYSTEM_USE_ENDPOINT_HEALTH_STATUS"] = '["generate"]' | ||
| env["DYN_SYSTEM_PORT"] = f"808{worker_id[-1]}" | ||
|
|
||
| # TODO: Have the managed process take a command name explicitly to distinguish | ||
| # between processes started with the same command. | ||
| log_dir = f"{request.node.name}_{worker_id}" | ||
|
|
||
| # Clean up any existing log directory from previous runs | ||
| try: | ||
| shutil.rmtree(log_dir) | ||
| logger.info(f"Cleaned up existing log directory: {log_dir}") | ||
| except FileNotFoundError: | ||
| # Directory doesn't exist, which is fine | ||
| pass | ||
|
|
||
| super().__init__( | ||
| command=command, | ||
| env=env, | ||
| health_check_urls=[ | ||
| (f"http://localhost:{FRONTEND_PORT}/v1/models", check_models_api), | ||
| (f"http://localhost:808{worker_id[-1]}/health", self.is_ready), | ||
| ], | ||
| timeout=300, | ||
| display_output=True, | ||
| terminate_existing=False, | ||
| stragglers=["SGLANG:EngineCore"], | ||
| straggler_commands=["-m dynamo.sglang"], | ||
| log_dir=log_dir, | ||
| ) | ||
|
|
||
| def get_pid(self): | ||
| """Get the PID of the worker process""" | ||
| return self.proc.pid if self.proc else None | ||
|
|
||
| def is_ready(self, response) -> bool: | ||
| """Check the health of the worker process""" | ||
| try: | ||
| data = response.json() | ||
| if data.get("status") == "ready": | ||
| logger.info(f"{self.worker_id} status is ready") | ||
| return True | ||
| logger.warning( | ||
| f"{self.worker_id} status is not ready: {data.get('status')}" | ||
| ) | ||
| except ValueError: | ||
| logger.warning(f"{self.worker_id} health response is not valid JSON") | ||
| return False | ||
|
|
||
|
|
||
| def test_request_migration_sglang_worker_failure( | ||
| request, runtime_services, predownload_models, set_ucx_tls_no_mm | ||
| ): | ||
| """ | ||
| End-to-end test for worker fault tolerance with migration support using SGLang. | ||
|
|
||
| This test verifies that when a worker is killed during request processing, | ||
| the system can handle the failure gracefully and migrate the request to | ||
| another worker. | ||
| """ | ||
|
|
||
| # Step 1: Start the frontend | ||
| with DynamoFrontendProcess(request) as frontend: | ||
| logger.info("Frontend started successfully") | ||
|
|
||
| # Step 2: Start 2 workers sequentially | ||
| with DynamoWorkerProcess(request, "worker1") as worker1: | ||
| logger.info(f"Worker 1 PID: {worker1.get_pid()}") | ||
|
|
||
| with DynamoWorkerProcess(request, "worker2") as worker2: | ||
| logger.info(f"Worker 2 PID: {worker2.get_pid()}") | ||
|
|
||
| # Step 3: Send the request | ||
| request_thread, response_list = start_completion_request() | ||
|
|
||
| # Step 4: Use polling to determine which worker received the request | ||
| worker, worker_name = determine_request_receiving_worker( | ||
| worker1, worker2, receiving_pattern="New Request ID: " | ||
| ) | ||
|
|
||
| # Step 5: Kill the worker that has the request | ||
| logger.info( | ||
| f"Killing {worker_name} with PID {worker.get_pid()} processing the request" | ||
| ) | ||
| terminate_process_tree(worker.get_pid(), immediate_kill=True, timeout=0) | ||
|
|
||
| # Step 6: Validate the completion response | ||
| validate_completion_response(request_thread, response_list) | ||
|
|
||
| # Step 7: Verify migration occurred | ||
| verify_migration_occurred(frontend) | ||
|
|
||
|
|
||
| @pytest.mark.skip(reason="SGLang graceful shutdown not yet implemented") | ||
| def test_request_migration_sglang_graceful_shutdown( | ||
| request, runtime_services, predownload_models, set_ucx_tls_no_mm | ||
| ): | ||
| """ | ||
| End-to-end test for worker fault tolerance with graceful shutdown and migration support using SGLang. | ||
|
|
||
| This test verifies that when a worker receives a graceful shutdown signal (SIGTERM) | ||
| during request processing, the system can handle the shutdown gracefully and migrate | ||
| the request to another worker. Unlike the abrupt kill test, this simulates a more | ||
| controlled shutdown scenario where the worker has time to clean up and notify the | ||
| system about its shutdown. | ||
| """ | ||
|
|
||
| # Step 1: Start the frontend | ||
| with DynamoFrontendProcess(request) as frontend: | ||
| logger.info("Frontend started successfully") | ||
|
|
||
| # Step 2: Start 2 workers sequentially | ||
| with DynamoWorkerProcess(request, "worker1") as worker1: | ||
| logger.info(f"Worker 1 PID: {worker1.get_pid()}") | ||
|
|
||
| with DynamoWorkerProcess(request, "worker2") as worker2: | ||
| logger.info(f"Worker 2 PID: {worker2.get_pid()}") | ||
|
|
||
| # Step 3: Send the request | ||
| request_thread, response_list = start_completion_request() | ||
|
|
||
| # Step 4: Use polling to determine which worker received the request | ||
| worker, worker_name = determine_request_receiving_worker( | ||
| worker1, worker2, receiving_pattern="New Request ID: " | ||
| ) | ||
|
|
||
| # Step 5: Gracefully shutdown the worker that has the request | ||
| logger.info( | ||
| f"Gracefully shutting down {worker_name} with PID {worker.get_pid()} processing the request" | ||
| ) | ||
| terminate_process_tree( | ||
| worker.get_pid(), immediate_kill=False, timeout=10 | ||
| ) | ||
|
|
||
| # Step 6: Validate the completion response | ||
| validate_completion_response(request_thread, response_list) | ||
|
|
||
| # Step 7: Verify migration occurred during graceful shutdown | ||
| verify_migration_occurred(frontend) | ||
|
|
||
|
|
||
| def test_no_request_migration_sglang_worker_failure( | ||
| request, runtime_services, predownload_models, set_ucx_tls_no_mm | ||
| ): | ||
| """ | ||
| End-to-end test for worker fault tolerance with migration disabled using SGLang. | ||
|
|
||
| This test verifies that when migration is disabled (migration_limit=0) and a worker | ||
| is killed during request processing, the request fails as expected without migration. | ||
| This is the opposite behavior of test_request_migration_sglang_worker_failure. | ||
| """ | ||
|
|
||
| # Step 1: Start the frontend | ||
| with DynamoFrontendProcess(request) as frontend: | ||
| logger.info("Frontend started successfully") | ||
|
|
||
| # Step 2: Start 2 workers sequentially with migration disabled | ||
| with DynamoWorkerProcess(request, "worker1", migration_limit=0) as worker1: | ||
| logger.info(f"Worker 1 PID: {worker1.get_pid()}") | ||
|
|
||
| with DynamoWorkerProcess(request, "worker2", migration_limit=0) as worker2: | ||
| logger.info(f"Worker 2 PID: {worker2.get_pid()}") | ||
|
|
||
| # Step 3: Send the request | ||
| request_thread, response_list = start_completion_request() | ||
|
|
||
| # Step 4: Use polling to determine which worker received the request | ||
| worker, worker_name = determine_request_receiving_worker( | ||
| worker1, worker2, receiving_pattern="New Request ID: " | ||
| ) | ||
|
|
||
| # Step 5: Kill the worker that has the request | ||
| logger.info( | ||
| f"Killing {worker_name} with PID {worker.get_pid()} processing the request" | ||
| ) | ||
| terminate_process_tree(worker.get_pid(), immediate_kill=True, timeout=0) | ||
|
|
||
| # Step 6: Validate the completion response - should fail without migration | ||
| try: | ||
| validate_completion_response(request_thread, response_list) | ||
| pytest.fail( | ||
| "Request succeeded unexpectedly when migration was disabled" | ||
| ) | ||
| except AssertionError as e: | ||
| assert "Request failed with status 500: " in str( | ||
| e | ||
| ), f"Unexpected request error message: {e}" | ||
|
|
||
| # Step 7: Verify migration did NOT occur - should fail | ||
| try: | ||
| verify_migration_occurred(frontend) | ||
| pytest.fail( | ||
| "Migration verification unexpectedly passed when migration was disabled" | ||
| ) | ||
| except AssertionError as e: | ||
| assert "'Cannot recreate stream: ...' error found in logs" in str( | ||
| e | ||
| ), f"Unexpected migration message: {e}" | ||
|
|
||
|
|
||
| @pytest.mark.skip(reason="SGLang graceful shutdown not yet implemented") | ||
| def test_no_request_migration_sglang_graceful_shutdown( | ||
| request, runtime_services, predownload_models, set_ucx_tls_no_mm | ||
| ): | ||
| """ | ||
| End-to-end test for worker fault tolerance with graceful shutdown and migration disabled using SGLang. | ||
|
|
||
| This test verifies that when migration is disabled (migration_limit=0) and a worker | ||
| receives a graceful shutdown signal (SIGTERM) during request processing, the request | ||
| fails as expected without migration. This is the opposite behavior of | ||
| test_request_migration_sglang_graceful_shutdown. | ||
| """ | ||
|
|
||
| # Step 1: Start the frontend | ||
| with DynamoFrontendProcess(request) as frontend: | ||
| logger.info("Frontend started successfully") | ||
|
|
||
| # Step 2: Start 2 workers sequentially with migration disabled | ||
| with DynamoWorkerProcess(request, "worker1", migration_limit=0) as worker1: | ||
| logger.info(f"Worker 1 PID: {worker1.get_pid()}") | ||
|
|
||
| with DynamoWorkerProcess(request, "worker2", migration_limit=0) as worker2: | ||
| logger.info(f"Worker 2 PID: {worker2.get_pid()}") | ||
|
|
||
| # Step 3: Send the request | ||
| request_thread, response_list = start_completion_request() | ||
|
|
||
| # Step 4: Use polling to determine which worker received the request | ||
| worker, worker_name = determine_request_receiving_worker( | ||
| worker1, worker2, receiving_pattern="New Request ID: " | ||
| ) | ||
|
|
||
| # Step 5: Gracefully shutdown the worker that has the request | ||
| logger.info( | ||
| f"Gracefully shutting down {worker_name} with PID {worker.get_pid()} processing the request" | ||
| ) | ||
| terminate_process_tree( | ||
| worker.get_pid(), immediate_kill=False, timeout=10 | ||
| ) | ||
|
|
||
| # Step 6: Validate the completion response - should fail without migration | ||
| try: | ||
| validate_completion_response(request_thread, response_list) | ||
| pytest.fail( | ||
| "Request succeeded unexpectedly when migration was disabled" | ||
| ) | ||
| except AssertionError as e: | ||
| assert "Request failed with status 500: " in str( | ||
| e | ||
| ), f"Unexpected request error message: {e}" | ||
|
|
||
| # Step 7: Verify migration did NOT occur - should fail | ||
| try: | ||
| verify_migration_occurred(frontend) | ||
| pytest.fail( | ||
| "Migration verification unexpectedly passed when migration was disabled" | ||
| ) | ||
| except AssertionError as e: | ||
| assert "'Cannot recreate stream: ...' error found in logs" in str( | ||
| e | ||
| ), f"Unexpected migration message: {e}" |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wow! Does that 192 extra tokens cause problems?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it turns out different backends has different definitions in terms of token lengths. In SGLang, the max_tokens don't include the input token, so with max context length = 8192, the input tokens ~10 + output token 8192 exceeded 8192.
I'm dropping this to 8000, which also matched the default length at the cancellation test, making room for the input tokens.