Skip to content

Conversation

@juncaipeng
Copy link
Collaborator

Motivation

decode use cpu buffer to receive cache from prefill

Modifications

  • add create_pinned_shm and open_pinned_shm
  • cache_messager and cache_transfer_manager support splitwise cpu cache buffer
  • resource_manager_v1 and prefix_cache_manager.py support splitwise cpu cache buffer

Usage or Command

Decode can set --splitwise-cache-buffer-size 10 args

Accuracy Tests

Refer to unittest

Checklist

  • Add at least a tag in the PR title.
    • Tag list: [[FDConfig],[APIServer],[Engine], [Scheduler], [PD Disaggregation], [Executor], [Graph Optimization], [Speculative Decoding], [RL], [Models], [Quantization], [Loader], [OP], [KVCache], [DataProcessor], [BugFix], [Docs], [CI], [Optimization], [Feature], [Benchmark], [Others], [XPU], [HPU], [GCU], [DCU], [Iluvatar], [Metax]]
    • You can add new tags based on the PR content, but the semantics must be clear.
  • Format your code, run pre-commit before commit.
  • Add unit tests. Please write the reason in this PR if no unit tests.
  • Provide accuracy results.
  • If the current PR is submitting to the release branch, make sure the PR has been submitted to the develop branch, then cherry-pick it to the release branch with the [Cherry-Pick] PR tag.

Copilot AI review requested due to automatic review settings December 1, 2025 04:07
@paddle-bot
Copy link

paddle-bot bot commented Dec 1, 2025

Thanks for your contribution!

Copilot finished reviewing on behalf of juncaipeng December 1, 2025 04:12
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR implements CPU buffer support for decode instances in PD (Prefill-Decode) disaggregation, allowing decode instances to receive cache from prefill instances via CPU memory instead of directly to GPU memory. This feature is controlled by the --splitwise-cache-buffer-size parameter and works with RDMA cache transfer protocol.

Key Changes:

  • Added CPU pinned shared memory operations (create_pinned_shm, open_pinned_shm) for inter-process communication
  • Extended cache manager components to support splitwise CPU buffer allocation and swapping
  • Enhanced RDMA communicator to handle different tensor parallelism sizes between prefill and decode

Reviewed changes

Copilot reviewed 20 out of 20 changed files in this pull request and generated 30 comments.

Show a summary per file
File Description
tests/e2e/test_ernie_03b_pd_router_v1_cpu_buffer.py New end-to-end test for CPU buffer feature with router-based splitwise deployment
tests/ci_use/Qwen2-7B-Instruct_offline/test_Qwen2-7B-Instruct_offline.py Added cleanup call in test teardown
tests/cache_manager/test_cache_transfer_manager.py Added splitwise_cache_buffer_block_num parameter to test Args class
fastdeploy/envs.py Added FD_PREFILL_WAIT_DECODE_RESOURCE_SECONDES environment variable
fastdeploy/engine/args_utils.py Added splitwise-cache-buffer-size CLI argument with validation
fastdeploy/config.py Added cache buffer configuration fields and block number calculation
fastdeploy/router/utils.py Enhanced InstanceInfo with tp_size field and from_dict factory method
fastdeploy/router/router.py Added tp_size validation and IPC protocol compatibility checks
fastdeploy/splitwise/splitwise_connector.py Enhanced logging and timeout configuration for decode resource allocation
fastdeploy/engine/common_engine.py Refactored prefilled request processing to prepare CPU buffer swapping
fastdeploy/engine/sched/resource_manager_v1.py Implemented CPU buffer allocation and GPU swapping for decode instances
fastdeploy/cache_manager/prefix_cache_manager.py Added splitwise CPU buffer management and conditional cache transfer manager launching
fastdeploy/cache_manager/cache_transfer_manager.py Implemented CPU cache initialization with pinned shared memory support
fastdeploy/cache_manager/cache_messager.py Enhanced to register and manage splitwise CPU buffer for RDMA operations
fastdeploy/cache_manager/cache_data.py Added SPLITWISE_CPU2GPU cache status enum value
fastdeploy/cache_manager/ops.py Exported create_pinned_shm and open_pinned_shm functions
custom_ops/gpu_ops/cpp_extensions.cc Implemented pinned shared memory creation/opening/cleanup functions
fastdeploy/cache_manager/transfer_factory/kvcache_transfer/include/kvcache_rdma.h Added tp_size fields for handling different parallelism configurations
fastdeploy/cache_manager/transfer_factory/kvcache_transfer/src/kvcache_rdma.cpp Implemented offset calculation for different tp_size scenarios and fixed connection mutex placement
fastdeploy/cache_manager/transfer_factory/kvcache_transfer/src/pybind.cpp Enhanced Python bindings with explicit parameter names and GIL release guards

print("🟢 接收响应中...\n")
return res
except requests.exceptions.Timeout:
print(f"❌ 请求超时(超过 {timeout} 秒)")
Copy link

Copilot AI Dec 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Chinese comment detected. Consider translating to English: "请求超时(超过 {timeout} 秒)" should be "Request timeout (exceeded {timeout} seconds)"

Suggested change
print(f"❌ 请求超时(超过 {timeout} 秒)")
print(f"❌ Request timeout (exceeded {timeout} seconds)")

Copilot uses AI. Check for mistakes.
Comment on lines +58 to +109
// Create a shared memory region and register it with CUDA
// The pinned shm can be shared between processes
uintptr_t create_pinned_shm(const char* shm_name, size_t byte_size) {
int fd = shm_open(shm_name, O_CREAT | O_RDWR, 0666);
if (fd < 0) throw std::runtime_error("shm_open failed");

if (ftruncate(fd, byte_size) != 0) {
close(fd);
throw std::runtime_error("ftruncate failed");
}

void* addr =
mmap(nullptr, byte_size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
if (addr == MAP_FAILED) {
close(fd);
throw std::runtime_error("mmap failed");
}

check_cuda_error(cudaHostRegister(addr, byte_size, cudaHostRegisterPortable));

close(fd);
return reinterpret_cast<uintptr_t>(addr);
}

uintptr_t open_pinned_shm(const char* shm_name, size_t byte_size) {
int fd = shm_open(shm_name, O_RDWR, 0666);
if (fd < 0) throw std::runtime_error("shm_open failed");

void* addr =
mmap(nullptr, byte_size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
if (addr == MAP_FAILED) {
close(fd);
throw std::runtime_error("mmap failed");
}

check_cuda_error(cudaHostRegister(addr, byte_size, cudaHostRegisterPortable));

close(fd);
return reinterpret_cast<uintptr_t>(addr);
}

void free_pinned_shm(const char* shm_name,
uintptr_t addr_uint,
size_t byte_size) {
void* addr = reinterpret_cast<void*>(addr_uint);

check_cuda_error(cudaHostUnregister(addr));

if (munmap(addr, byte_size) != 0) throw std::runtime_error("munmap failed");

if (shm_unlink(shm_name) != 0) throw std::runtime_error("shm_unlink failed");
}
Copy link

Copilot AI Dec 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new C++ functions create_pinned_shm, open_pinned_shm, and free_pinned_shm in cpp_extensions.cc lack unit test coverage. Consider adding tests to verify:

  1. Successful creation and opening of shared memory
  2. Error handling when operations fail (e.g., shm_open, mmap, cudaHostRegister failures)
  3. Proper cleanup and resource release
  4. Concurrent access scenarios

Copilot uses AI. Check for mistakes.

def send_request(url, payload, timeout=600):
"""
发送请求到指定的URL,并返回响应结果。
Copy link

Copilot AI Dec 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Chinese comment detected. Consider translating to English for consistency: "发送请求到指定的URL,并返回响应结果。" should be "Send a request to the specified URL and return the response result."

Suggested change
发送请求到指定的URL并返回响应结果
Send a request to the specified URL and return the response result.

Copilot uses AI. Check for mistakes.
Comment on lines +75 to +78

check_cuda_error(cudaHostRegister(addr, byte_size, cudaHostRegisterPortable));

close(fd);
Copy link

Copilot AI Dec 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] The close(fd) call should be done before line 76 (after the mmap), not after cudaHostRegister. File descriptors from shm_open can be closed immediately after mmap completes, as the mapping maintains its own reference. The current placement is correct but the ordering could be improved for clarity. Consider moving it right after mmap (line 70-75 area) to follow best practices.

Suggested change
check_cuda_error(cudaHostRegister(addr, byte_size, cudaHostRegisterPortable));
close(fd);
close(fd);
check_cuda_error(cudaHostRegister(addr, byte_size, cudaHostRegisterPortable));

Copilot uses AI. Check for mistakes.

if self.splitwise_cache_buffer_size is not None and self.splitwise_cache_buffer_size < 0.0:
raise ValueError(
"splitwise_cache_buffer_size must be greater than 0.0. Got " f"{self.splitwise_cache_buffer_size}."
Copy link

Copilot AI Dec 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo in the error message: "must be greater than 0.0" should be "must be non-negative" or "must be greater than or equal to 0.0" since the condition checks < 0.0. Alternatively, if the intention is to require positive values, change the condition to <= 0.0.

Suggested change
"splitwise_cache_buffer_size must be greater than 0.0. Got " f"{self.splitwise_cache_buffer_size}."
"splitwise_cache_buffer_size must be non-negative. Got " f"{self.splitwise_cache_buffer_size}."

Copilot uses AI. Check for mistakes.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant