Skip to content

Commit 29be755

Browse files
authored
[enhance]optimize kvstar core bind method & delta kvcache swap (#330)
* delta kvcache block swap * clean code * add core bind method * clean code
1 parent 2ec56df commit 29be755

File tree

7 files changed

+171
-78
lines changed

7 files changed

+171
-78
lines changed

ucm/sparse/kvstar/multistep.py

Lines changed: 37 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
UcmSparseRole,
1818
)
1919
from ucm.sparse.kvstar.retrieve import kvstar_retrieve
20-
from ucm.sparse.kvstar.utils import bind_cpus, block_hash_func, get_offset
20+
from ucm.sparse.kvstar.utils import block_hash_func, get_bind_cpus_for_rank, get_offset
2121
from ucm.store.ucmstore import Task, UcmKVStoreBase
2222

2323
"""
@@ -217,6 +217,10 @@ def __init__(
217217

218218
self.num_blocks_dumped = 0
219219

220+
self.layer_wise_pre_swap_area_block_hashes: Dict[int, str] = (
221+
{}
222+
) # key: block id, value: block hash id
223+
220224
@classmethod
221225
def block_hash(cls, request_id, block_id):
222226
return f"req_{request_id}_blk_{block_id}"
@@ -441,8 +445,37 @@ def load_retrieve_result_async(self, load_step, candidate_swap_vllm_block_ids):
441445
assert 0
442446
retrieve_result_hash_list = self.step_group_retrieve_result.get(
443447
need_retrieve_record
444-
)
448+
).copy()
445449
if need_retrieve_record != "prefill" or load_step == 1:
450+
if len(self.layer_wise_pre_swap_area_block_hashes) == 0:
451+
self.layer_wise_pre_swap_area_block_hashes = {
452+
blk_id: blk_hash
453+
for (blk_id, blk_hash) in zip(
454+
candidate_swap_vllm_block_ids, retrieve_result_hash_list
455+
)
456+
}
457+
else:
458+
already_matched_record = {}
459+
for logic_blk_id in candidate_swap_vllm_block_ids:
460+
if (
461+
logic_blk_id in self.layer_wise_pre_swap_area_block_hashes
462+
and self.layer_wise_pre_swap_area_block_hashes[logic_blk_id]
463+
in retrieve_result_hash_list
464+
):
465+
already_matched_record[logic_blk_id] = (
466+
self.layer_wise_pre_swap_area_block_hashes[logic_blk_id]
467+
)
468+
candidate_swap_vllm_block_ids.remove(logic_blk_id)
469+
retrieve_result_hash_list.remove(
470+
already_matched_record[logic_blk_id]
471+
)
472+
self.layer_wise_pre_swap_area_block_hashes = already_matched_record
473+
for diff_blk_id, diff_blk_hash in zip(
474+
candidate_swap_vllm_block_ids, retrieve_result_hash_list
475+
):
476+
self.layer_wise_pre_swap_area_block_hashes[diff_blk_id] = (
477+
diff_blk_hash
478+
)
446479
if len(retrieve_result_hash_list) > 0:
447480
self.launch_transfer_task(
448481
"load", retrieve_result_hash_list, candidate_swap_vllm_block_ids
@@ -616,16 +649,14 @@ def __init__(self, vllm_config: VllmConfig, role: UcmSparseRole):
616649
)
617650
if self.role == UcmSparseRole.WORKER:
618651
ratio = 0.75
619-
numa_nodes_num, alloc_numa_ids, phy_cpu_core_per_numa = bind_cpus(
652+
bind_info_list, alloc_numa_ids = get_bind_cpus_for_rank(
620653
self.total_tp_size, self.local_tp_rank, ratio=ratio
621654
)
622655

623656
cpu_device = kvstar_retrieve.CPU
624657
param = kvstar_retrieve.SetupParam(
625658
cpuNumaIds=alloc_numa_ids,
626-
physicalCorePerNuma=phy_cpu_core_per_numa,
627-
allocRatio=ratio,
628-
blkRepreSize=4096,
659+
bindInfo=bind_info_list,
629660
deviceType=cpu_device,
630661
totalTpSize=self.total_tp_size,
631662
localRankId=self.local_tp_rank,

ucm/sparse/kvstar/retrieve/core/api/kvstar_retrieve/kvstar_retrieve.cpp

Lines changed: 5 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -7,38 +7,19 @@
77
#include "retrieve_task/retrieve_task_manager.h"
88

99
namespace KVStar {
10-
SetupParam::SetupParam(const std::vector<int>& cpuNumaIds, const int physicalCorePerNuma, const float allocRatio, const size_t blkRepreSize,
11-
const DeviceType deviceType, const int totalTpSize, const int localRankId)
12-
: cpuNumaIds{cpuNumaIds}, physicalCorePerNuma{physicalCorePerNuma}, allocRatio{allocRatio}, blkRepreSize{blkRepreSize}, deviceType{deviceType},
10+
SetupParam::SetupParam(const std::vector<int>& cpuNumaIds, const std::vector<std::pair<int, int>>& bindInfo, const DeviceType deviceType, const int totalTpSize, const int localRankId)
11+
: cpuNumaIds{cpuNumaIds}, bindInfo{bindInfo}, deviceType{deviceType},
1312
totalTpSize{totalTpSize}, localRankId{localRankId}
1413
{
15-
16-
int coreNumPerNumaAlloc = static_cast<int>(this->physicalCorePerNuma * this->allocRatio);
17-
18-
this->perNumaCoreIds.clear();
19-
this->perNumaCoreIds.reserve(this->cpuNumaIds.size());
20-
21-
for (const int numaId : this->cpuNumaIds) {
22-
int startCoreId = numaId * this->physicalCorePerNuma;
23-
24-
std::vector<int> curNumaCoreIdAlloc(coreNumPerNumaAlloc);
25-
26-
std::iota(curNumaCoreIdAlloc.begin(), curNumaCoreIdAlloc.end(), startCoreId);
27-
28-
this->perNumaCoreIds.push_back(curNumaCoreIdAlloc);
29-
30-
KVSTAR_DEBUG("Alloc core ids {} in numa {}.", curNumaCoreIdAlloc, numaId);
31-
}
32-
33-
this->threadNum = static_cast<int>(coreNumPerNumaAlloc * this->cpuNumaIds.size());
14+
this->threadNum = this->bindInfo.size();
3415
KVSTAR_DEBUG("Successfully configured. Total threads = {}.", this->threadNum);
3516
}
3617

3718

3819
int32_t Setup(const SetupParam& param)
3920
{
4021

41-
auto status = Singleton<RetrieveTaskManager>::Instance()->Setup(param.threadNum, param.cpuNumaIds, param.perNumaCoreIds);
22+
auto status = Singleton<RetrieveTaskManager>::Instance()->Setup(param.threadNum, param.bindInfo);
4223
if (status.Failure()) {
4324
KVSTAR_ERROR("Failed({}) to setup RetrieveTaskManager.", status);
4425
return status.Underlying();
@@ -53,4 +34,4 @@ int32_t Wait(const size_t taskId) {
5334
}
5435

5536

56-
}
37+
}

ucm/sparse/kvstar/retrieve/core/api/kvstar_retrieve/kvstar_retrieve.h

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -13,16 +13,13 @@ namespace KVStar {
1313

1414
struct SetupParam {
1515
std::vector<int> cpuNumaIds;
16-
int physicalCorePerNuma;
17-
float allocRatio;
18-
size_t blkRepreSize;
16+
std::vector<std::pair<int, int>> bindInfo; // coreId, numaId
1917
DeviceType deviceType;
2018
int totalTpSize;
2119
int localRankId;
22-
std::vector<std::vector<int>> perNumaCoreIds;
2320
int threadNum;
2421

25-
SetupParam(const std::vector<int>& cpuNumaIds, const int physicalCorePerNuma, const float allocRatio, const size_t blkRepreSize,
22+
SetupParam(const std::vector<int>& cpuNumaIds, const std::vector<std::pair<int, int>>& bindInfo,
2623
const DeviceType deviceType, const int totalTpSize, const int localRankId);
2724

2825
};
@@ -36,4 +33,4 @@ int32_t Wait(const size_t taskId);
3633

3734

3835

39-
#endif //KVSTAR_RETRIEVE_CLIB_KVSTAR_RETRIEVE_H
36+
#endif //KVSTAR_RETRIEVE_CLIB_KVSTAR_RETRIEVE_H

ucm/sparse/kvstar/retrieve/core/domain/retrieve_task/retrieve_task_manager.cpp

Lines changed: 8 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -2,47 +2,25 @@
22
#include "retrieve_task_manager.h"
33

44
namespace KVStar {
5-
Status RetrieveTaskManager::Setup(const size_t threadNum, const std::vector<int>& cpuNumaIds, const std::vector<std::vector<int>>& bindCoreId) {
5+
Status RetrieveTaskManager::Setup(const size_t threadNum, const std::vector<std::pair<int, int>>& bindInfo) {
66

7-
const size_t numaNodeCount = cpuNumaIds.size();
8-
if (numaNodeCount == 0) {
9-
KVSTAR_ERROR("Retrieve task manager get error numa id info {}.", cpuNumaIds);
7+
if (threadNum != bindInfo.size()) {
8+
KVSTAR_ERROR("Thread count ({}) does not match the size of bind-core-ID list ({}).", threadNum, bindInfo.size());
109
return Status::InvalidParam();
1110
}
1211

13-
if (threadNum % numaNodeCount != 0) {
14-
KVSTAR_ERROR("Retrieve task manager can not split threads into each numa, thread num {}, numa id info {}.", threadNum, cpuNumaIds);
15-
return Status::InvalidParam();
16-
}
17-
18-
if (bindCoreId.size() != numaNodeCount) {
19-
KVSTAR_ERROR("Bind core ids {} can not match numa id info {}.", bindCoreId, cpuNumaIds);
20-
return Status::InvalidParam();
21-
}
22-
23-
const size_t threadsPerNuma = threadNum / numaNodeCount;
24-
2512
this->_queues.reserve(threadNum);
2613
for (size_t i = 0; i < threadNum; ++i) {
27-
const size_t numaListIndex = i / threadsPerNuma;
28-
29-
const size_t coreListIndex = i % threadsPerNuma;
30-
31-
if (coreListIndex >= bindCoreId[numaListIndex].size()) {
32-
KVSTAR_ERROR("Bind core ids {} can not alloc per numa need alloc threads num {}.", bindCoreId, threadsPerNuma);
33-
return Status::InvalidParam();
34-
}
35-
36-
const int targetNumaId = cpuNumaIds[numaListIndex];
37-
const int targetCoreId = bindCoreId[numaListIndex][coreListIndex];
14+
const int targetCoreId = bindInfo[i].first;
15+
const int targetNumaId = bindInfo[i].second;
3816

3917
auto& queue = this->_queues.emplace_back(std::make_unique<RetrieveTaskQueue>());
4018
auto status = queue->Setup(targetNumaId, targetCoreId, &this->_failureSet);
4119
if (status.Failure()) {
42-
KVSTAR_ERROR("Init and setup thread id {} in pool failed.", i);
20+
KVSTAR_ERROR("Init and setup thread id {} (to core {}) in pool failed.", i, targetCoreId);
4321
return status;
4422
}
45-
KVSTAR_DEBUG("Init and setup thread id {} in pool success.", i);
23+
KVSTAR_DEBUG("Init and setup thread id {} in pool to core {} success.", i, targetCoreId);
4624
}
4725
return Status::OK();
4826
}
@@ -106,4 +84,4 @@ Status RetrieveTaskManager::GetResult(size_t taskId, std::shared_ptr<TaskResult>
10684
}
10785

10886

109-
}
87+
}

ucm/sparse/kvstar/retrieve/core/domain/retrieve_task/retrieve_task_manager.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
namespace KVStar {
1111
class RetrieveTaskManager {
1212
public:
13-
Status Setup(const size_t threadNum, const std::vector<int>& cpuNumaIds, const std::vector<std::vector<int>>& bindCoreId); // 重要, 线程池拉起的入口
13+
Status Setup(const size_t threadNum, const std::vector<std::pair<int, int>>& bindInfo);
1414
Status SubmitSingleTask(RetrieveTask&&task, size_t &taskId);
1515

1616
Status GetResult(size_t taskId, std::shared_ptr<TaskResult>& result);
@@ -36,4 +36,4 @@ class RetrieveTaskManager {
3636

3737

3838

39-
#endif //UCM_SPARSE_KVSTAR_RETRIEVE_RETRIEVE_TASK_MANAGER_H
39+
#endif //UCM_SPARSE_KVSTAR_RETRIEVE_RETRIEVE_TASK_MANAGER_H

ucm/sparse/kvstar/retrieve/py_intf/py_intf.cpp

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -106,23 +106,17 @@ PYBIND11_MODULE(kvstar_retrieve, module)
106106

107107
py::class_<KVStar::SetupParam>(module, "SetupParam")
108108
.def(py::init<const std::vector<int>&,
109-
const int,
110-
const float,
111-
const size_t,
109+
const std::vector<std::pair<int, int>>&,
112110
const KVStar::DeviceType,
113111
const int,
114112
const int>(),
115113
py::arg("cpuNumaIds"),
116-
py::arg("physicalCorePerNuma"),
117-
py::arg("allocRatio"),
118-
py::arg("blkRepreSize"),
114+
py::arg("bindInfo"),
119115
py::arg("deviceType"),
120116
py::arg("totalTpSize"),
121117
py::arg("localRankId"))
122118
.def_readwrite("cpuNumaIds", &KVStar::SetupParam::cpuNumaIds)
123-
.def_readwrite("physicalCorePerNuma", &KVStar::SetupParam::physicalCorePerNuma)
124-
.def_readwrite("allocRatio", &KVStar::SetupParam::allocRatio)
125-
.def_readwrite("blkRepreSize", &KVStar::SetupParam::blkRepreSize)
119+
.def_readwrite("bindInfo", &KVStar::SetupParam::bindInfo)
126120
.def_readwrite("deviceType", &KVStar::SetupParam::deviceType)
127121
.def_readwrite("totalTpSize", &KVStar::SetupParam::totalTpSize)
128122
.def_readwrite("localRankId", &KVStar::SetupParam::localRankId);
@@ -131,4 +125,4 @@ PYBIND11_MODULE(kvstar_retrieve, module)
131125
module.def("AsyncRetrieveByCPU", &KVStar::AsyncRetrieveByCPU);
132126
module.def("Wait", &KVStar::Wait);
133127
module.def("GetTaskResult", &KVStar::GetTaskResult);
134-
}
128+
}

ucm/sparse/kvstar/utils.py

Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import collections
12
import hashlib
23
import pickle
34
import subprocess
@@ -103,3 +104,114 @@ def bind_cpus(world_size, rank_id, ratio=0.5):
103104
print(f"cpu_core_alloc: {cpu_core_alloc}")
104105

105106
return numa_nodes_num, alloc_numa_ids, phy_cpu_core_per_numa
107+
108+
109+
def get_physical_core_topology():
110+
"""
111+
use lscpu -e parse accurate cpu topology
112+
return a dict, key: numa_id, value: physical core ids in this numa
113+
"""
114+
# topology[numa_id][core_id] = logical_cpu_id
115+
# make sure each physical core only record once
116+
topology = collections.defaultdict(dict)
117+
118+
# execute lscpu -e, split as line
119+
# e.g.: 36 0 0 0 0:0:0:0 yes 3700.0000 1000.0000
120+
lscpu_output = execute_command(["lscpu", "-e"]).strip().split("\n")
121+
122+
# skip title
123+
for line in lscpu_output[1:]:
124+
parts = line.split()
125+
if len(parts) < 4:
126+
continue
127+
128+
logical_cpu_id = int(parts[0])
129+
numa_id = int(parts[1])
130+
core_id = int(parts[3]) # physical core id
131+
132+
if core_id not in topology[numa_id]:
133+
topology[numa_id][core_id] = logical_cpu_id
134+
135+
final_mapping = {
136+
numa_id: list(sorted(cores.values())) for numa_id, cores in topology.items()
137+
}
138+
return final_mapping
139+
140+
141+
def get_bind_cpus_for_rank(world_size, rank_id, ratio=1.0):
142+
"""
143+
for each rank, compute alloc numa id
144+
145+
scenario:
146+
1. numa_num >= world_size, equal division numa for each rank
147+
2. numa_num < world_size, equal division total cores for each rank
148+
"""
149+
physical_core_map = get_physical_core_topology()
150+
if not physical_core_map:
151+
print("Could not determine CPU topology. Aborting bind.")
152+
return [], []
153+
154+
print(f"Detected Physical Core Topology: {physical_core_map}")
155+
156+
numa_nodes_num = len(physical_core_map)
157+
sorted_numa_ids = sorted(physical_core_map.keys())
158+
159+
bind_info_list = []
160+
alloc_numa_ids = []
161+
162+
numas_per_rank = numa_nodes_num // world_size
163+
164+
if numas_per_rank > 0:
165+
print(f"Strategy: NUMA-level discard binding.")
166+
167+
discarded_numa_count = numa_nodes_num % world_size
168+
if discarded_numa_count > 0:
169+
print(
170+
f"Note: {discarded_numa_count} NUMA node(s) (IDs: {sorted_numa_ids[-discarded_numa_count:]}) will be unused to ensure fair distribution."
171+
)
172+
173+
start_numa_idx = rank_id * numas_per_rank
174+
end_numa_idx = start_numa_idx + numas_per_rank
175+
176+
alloc_numa_ids = sorted_numa_ids[start_numa_idx:end_numa_idx]
177+
178+
print(f"Rank {rank_id} allocated to NUMA nodes: {alloc_numa_ids}")
179+
180+
for numa_id in alloc_numa_ids:
181+
physical_cores_on_numa = physical_core_map.get(numa_id, [])
182+
cores_to_take = int(len(physical_cores_on_numa) * ratio)
183+
for core_id in physical_cores_on_numa[:cores_to_take]:
184+
bind_info_list.append((core_id, numa_id))
185+
186+
else:
187+
print(
188+
f"Strategy: Fallback to uniform core distribution ({world_size} ranks > {numa_nodes_num} NUMA nodes)."
189+
)
190+
191+
all_physical_cores_with_numa = []
192+
for numa_id in sorted_numa_ids:
193+
for core_id in physical_core_map[numa_id]:
194+
all_physical_cores_with_numa.append((core_id, numa_id))
195+
196+
total_physical_cores = len(all_physical_cores_with_numa)
197+
cores_per_rank = total_physical_cores // world_size
198+
if cores_per_rank == 0:
199+
print(
200+
f"Warning: Not enough physical cores ({total_physical_cores}) to assign at least one to each of the {world_size} ranks. Rank {rank_id} will not be bound to any core."
201+
)
202+
return [], sorted_numa_ids
203+
204+
start_core_idx = rank_id * cores_per_rank
205+
end_core_idx = start_core_idx + cores_per_rank
206+
207+
rank_core_share = all_physical_cores_with_numa[start_core_idx:end_core_idx]
208+
cores_to_take = int(len(rank_core_share) * ratio)
209+
bind_info_list = rank_core_share[:cores_to_take]
210+
211+
alloc_numa_ids = sorted_numa_ids
212+
213+
bind_info_list.sort()
214+
print(
215+
f"Rank {rank_id} will bind to {len(bind_info_list)} (CPU, NUMA) pairs: {bind_info_list}"
216+
)
217+
return bind_info_list, alloc_numa_ids

0 commit comments

Comments
 (0)