-
Notifications
You must be signed in to change notification settings - Fork 661
[PD Disaggregation] [tmp] decode use cpu buffer to receive cache from prefill #5308
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: develop
Are you sure you want to change the base?
Conversation
|
Thanks for your contribution! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR implements CPU buffer support for decode instances in PD (Prefill-Decode) disaggregation, allowing decode instances to receive cache from prefill instances via CPU memory instead of directly to GPU memory. This feature is controlled by the --splitwise-cache-buffer-size parameter and works with RDMA cache transfer protocol.
Key Changes:
- Added CPU pinned shared memory operations (
create_pinned_shm,open_pinned_shm) for inter-process communication - Extended cache manager components to support splitwise CPU buffer allocation and swapping
- Enhanced RDMA communicator to handle different tensor parallelism sizes between prefill and decode
Reviewed changes
Copilot reviewed 20 out of 20 changed files in this pull request and generated 30 comments.
Show a summary per file
| File | Description |
|---|---|
| tests/e2e/test_ernie_03b_pd_router_v1_cpu_buffer.py | New end-to-end test for CPU buffer feature with router-based splitwise deployment |
| tests/ci_use/Qwen2-7B-Instruct_offline/test_Qwen2-7B-Instruct_offline.py | Added cleanup call in test teardown |
| tests/cache_manager/test_cache_transfer_manager.py | Added splitwise_cache_buffer_block_num parameter to test Args class |
| fastdeploy/envs.py | Added FD_PREFILL_WAIT_DECODE_RESOURCE_SECONDES environment variable |
| fastdeploy/engine/args_utils.py | Added splitwise-cache-buffer-size CLI argument with validation |
| fastdeploy/config.py | Added cache buffer configuration fields and block number calculation |
| fastdeploy/router/utils.py | Enhanced InstanceInfo with tp_size field and from_dict factory method |
| fastdeploy/router/router.py | Added tp_size validation and IPC protocol compatibility checks |
| fastdeploy/splitwise/splitwise_connector.py | Enhanced logging and timeout configuration for decode resource allocation |
| fastdeploy/engine/common_engine.py | Refactored prefilled request processing to prepare CPU buffer swapping |
| fastdeploy/engine/sched/resource_manager_v1.py | Implemented CPU buffer allocation and GPU swapping for decode instances |
| fastdeploy/cache_manager/prefix_cache_manager.py | Added splitwise CPU buffer management and conditional cache transfer manager launching |
| fastdeploy/cache_manager/cache_transfer_manager.py | Implemented CPU cache initialization with pinned shared memory support |
| fastdeploy/cache_manager/cache_messager.py | Enhanced to register and manage splitwise CPU buffer for RDMA operations |
| fastdeploy/cache_manager/cache_data.py | Added SPLITWISE_CPU2GPU cache status enum value |
| fastdeploy/cache_manager/ops.py | Exported create_pinned_shm and open_pinned_shm functions |
| custom_ops/gpu_ops/cpp_extensions.cc | Implemented pinned shared memory creation/opening/cleanup functions |
| fastdeploy/cache_manager/transfer_factory/kvcache_transfer/include/kvcache_rdma.h | Added tp_size fields for handling different parallelism configurations |
| fastdeploy/cache_manager/transfer_factory/kvcache_transfer/src/kvcache_rdma.cpp | Implemented offset calculation for different tp_size scenarios and fixed connection mutex placement |
| fastdeploy/cache_manager/transfer_factory/kvcache_transfer/src/pybind.cpp | Enhanced Python bindings with explicit parameter names and GIL release guards |
fastdeploy/cache_manager/transfer_factory/kvcache_transfer/src/pybind.cpp
Show resolved
Hide resolved
| print("🟢 接收响应中...\n") | ||
| return res | ||
| except requests.exceptions.Timeout: | ||
| print(f"❌ 请求超时(超过 {timeout} 秒)") |
Copilot
AI
Dec 1, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Chinese comment detected. Consider translating to English: "请求超时(超过 {timeout} 秒)" should be "Request timeout (exceeded {timeout} seconds)"
| print(f"❌ 请求超时(超过 {timeout} 秒)") | |
| print(f"❌ Request timeout (exceeded {timeout} seconds)") |
| // Create a shared memory region and register it with CUDA | ||
| // The pinned shm can be shared between processes | ||
| uintptr_t create_pinned_shm(const char* shm_name, size_t byte_size) { | ||
| int fd = shm_open(shm_name, O_CREAT | O_RDWR, 0666); | ||
| if (fd < 0) throw std::runtime_error("shm_open failed"); | ||
|
|
||
| if (ftruncate(fd, byte_size) != 0) { | ||
| close(fd); | ||
| throw std::runtime_error("ftruncate failed"); | ||
| } | ||
|
|
||
| void* addr = | ||
| mmap(nullptr, byte_size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); | ||
| if (addr == MAP_FAILED) { | ||
| close(fd); | ||
| throw std::runtime_error("mmap failed"); | ||
| } | ||
|
|
||
| check_cuda_error(cudaHostRegister(addr, byte_size, cudaHostRegisterPortable)); | ||
|
|
||
| close(fd); | ||
| return reinterpret_cast<uintptr_t>(addr); | ||
| } | ||
|
|
||
| uintptr_t open_pinned_shm(const char* shm_name, size_t byte_size) { | ||
| int fd = shm_open(shm_name, O_RDWR, 0666); | ||
| if (fd < 0) throw std::runtime_error("shm_open failed"); | ||
|
|
||
| void* addr = | ||
| mmap(nullptr, byte_size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); | ||
| if (addr == MAP_FAILED) { | ||
| close(fd); | ||
| throw std::runtime_error("mmap failed"); | ||
| } | ||
|
|
||
| check_cuda_error(cudaHostRegister(addr, byte_size, cudaHostRegisterPortable)); | ||
|
|
||
| close(fd); | ||
| return reinterpret_cast<uintptr_t>(addr); | ||
| } | ||
|
|
||
| void free_pinned_shm(const char* shm_name, | ||
| uintptr_t addr_uint, | ||
| size_t byte_size) { | ||
| void* addr = reinterpret_cast<void*>(addr_uint); | ||
|
|
||
| check_cuda_error(cudaHostUnregister(addr)); | ||
|
|
||
| if (munmap(addr, byte_size) != 0) throw std::runtime_error("munmap failed"); | ||
|
|
||
| if (shm_unlink(shm_name) != 0) throw std::runtime_error("shm_unlink failed"); | ||
| } |
Copilot
AI
Dec 1, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The new C++ functions create_pinned_shm, open_pinned_shm, and free_pinned_shm in cpp_extensions.cc lack unit test coverage. Consider adding tests to verify:
- Successful creation and opening of shared memory
- Error handling when operations fail (e.g., shm_open, mmap, cudaHostRegister failures)
- Proper cleanup and resource release
- Concurrent access scenarios
|
|
||
| def send_request(url, payload, timeout=600): | ||
| """ | ||
| 发送请求到指定的URL,并返回响应结果。 |
Copilot
AI
Dec 1, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Chinese comment detected. Consider translating to English for consistency: "发送请求到指定的URL,并返回响应结果。" should be "Send a request to the specified URL and return the response result."
| 发送请求到指定的URL,并返回响应结果。 | |
| Send a request to the specified URL and return the response result. |
|
|
||
| check_cuda_error(cudaHostRegister(addr, byte_size, cudaHostRegisterPortable)); | ||
|
|
||
| close(fd); |
Copilot
AI
Dec 1, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nitpick] The close(fd) call should be done before line 76 (after the mmap), not after cudaHostRegister. File descriptors from shm_open can be closed immediately after mmap completes, as the mapping maintains its own reference. The current placement is correct but the ordering could be improved for clarity. Consider moving it right after mmap (line 70-75 area) to follow best practices.
| check_cuda_error(cudaHostRegister(addr, byte_size, cudaHostRegisterPortable)); | |
| close(fd); | |
| close(fd); | |
| check_cuda_error(cudaHostRegister(addr, byte_size, cudaHostRegisterPortable)); |
|
|
||
| if self.splitwise_cache_buffer_size is not None and self.splitwise_cache_buffer_size < 0.0: | ||
| raise ValueError( | ||
| "splitwise_cache_buffer_size must be greater than 0.0. Got " f"{self.splitwise_cache_buffer_size}." |
Copilot
AI
Dec 1, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Typo in the error message: "must be greater than 0.0" should be "must be non-negative" or "must be greater than or equal to 0.0" since the condition checks < 0.0. Alternatively, if the intention is to require positive values, change the condition to <= 0.0.
| "splitwise_cache_buffer_size must be greater than 0.0. Got " f"{self.splitwise_cache_buffer_size}." | |
| "splitwise_cache_buffer_size must be non-negative. Got " f"{self.splitwise_cache_buffer_size}." |
Motivation
decode use cpu buffer to receive cache from prefill
Modifications
Usage or Command
Decode can set --splitwise-cache-buffer-size 10 args
Accuracy Tests
Refer to unittest
Checklist
[FDConfig],[APIServer],[Engine],[Scheduler],[PD Disaggregation],[Executor],[Graph Optimization],[Speculative Decoding],[RL],[Models],[Quantization],[Loader],[OP],[KVCache],[DataProcessor],[BugFix],[Docs],[CI],[Optimization],[Feature],[Benchmark],[Others],[XPU],[HPU],[GCU],[DCU],[Iluvatar],[Metax]]pre-commitbefore commit.releasebranch, make sure the PR has been submitted to thedevelopbranch, then cherry-pick it to thereleasebranch with the[Cherry-Pick]PR tag.