Skip to content

Commit 8622635

Browse files
authored
[Feat]Add nfsstore bandwidth testing script (#323)
Add bandwidth testing script
1 parent 6512503 commit 8622635

File tree

2 files changed

+458
-0
lines changed

2 files changed

+458
-0
lines changed
Lines changed: 282 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,282 @@
1+
# -*- coding: utf-8 -*-
2+
#
3+
# MIT License
4+
#
5+
# Copyright (c) 2025 Huawei Technologies Co., Ltd. All rights reserved.
6+
#
7+
# Permission is hereby granted, free of charge, to any person obtaining a copy
8+
# of this software and associated documentation files (the "Software"), to deal
9+
# in the Software without restriction, including without limitation the rights
10+
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
11+
# copies of the Software, and to permit persons to whom the Software is
12+
# furnished to do so, subject to the following conditions:
13+
#
14+
# The above copyright notice and this permission notice shall be included in all
15+
# copies or substantial portions of the Software.
16+
#
17+
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
18+
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
19+
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
20+
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
21+
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
22+
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
23+
# SOFTWARE.
24+
#
25+
import csv
26+
import os
27+
import secrets
28+
import time
29+
from typing import Dict, List, Tuple
30+
31+
import torch
32+
33+
from ucm.store.nfsstore.nfsstore_connector import UcmNfsStore
34+
from ucm.store.ucmstore import UcmKVStoreBase
35+
36+
37+
def setup(
38+
storage_backends, block_size, device_id, io_size, transferStreamNumber
39+
) -> UcmKVStoreBase:
40+
config = {
41+
"storage_backends": storage_backends,
42+
"kv_block_size": block_size,
43+
"role": "worker",
44+
"device": device_id,
45+
"io_size": io_size,
46+
"transferStreamNumber": transferStreamNumber,
47+
}
48+
return UcmNfsStore(config)
49+
50+
51+
def make_buffers(
52+
block_number, device_id, batch_size, head_dim, block_len, block_layer, num_head, kv
53+
):
54+
hashes = [secrets.token_hex(16) for _ in range(block_number)]
55+
kv_caches = {}
56+
for i in range(block_layer):
57+
kv_caches[i] = torch.rand(
58+
[kv, block_number, block_len, num_head, head_dim],
59+
dtype=torch.bfloat16,
60+
device=f"cuda:{device_id}",
61+
)
62+
return hashes, kv_caches
63+
64+
65+
def store_all_hashes(hashes: List[str]):
66+
file_path = os.path.join(os.path.dirname(__file__), "kvcache_block_hashes.txt")
67+
with open(file_path, "w", encoding="utf-8") as f:
68+
for h in hashes:
69+
f.write(h + "\n")
70+
71+
72+
def embed(
73+
store: UcmKVStoreBase,
74+
hashes: List[str],
75+
kvcaches: Dict[int, torch.Tensor],
76+
mla: bool,
77+
):
78+
start_time = time.perf_counter()
79+
80+
total_block_ids, total_offsets, total_tensors = [], [], []
81+
total_size = 0
82+
83+
for i, hash_val in enumerate(hashes):
84+
offset = 0
85+
for layer_id, kv_layer in kvcaches.items():
86+
k_tensor = kv_layer[0][i] # kv=1
87+
total_tensors.append(k_tensor)
88+
total_block_ids.append(hash_val)
89+
total_offsets.append(offset)
90+
sz = k_tensor.numel() * k_tensor.element_size()
91+
offset += sz
92+
total_size += sz
93+
94+
if not mla:
95+
v_tensor = kv_layer[1][i]
96+
total_tensors.append(v_tensor)
97+
total_block_ids.append(hash_val)
98+
total_offsets.append(offset)
99+
sz = v_tensor.numel() * v_tensor.element_size()
100+
offset += sz
101+
total_size += sz
102+
103+
task = store.dump(total_block_ids, total_offsets, total_tensors)
104+
store.wait(task)
105+
106+
elapsed_time = time.perf_counter() - start_time
107+
throughput_gbps = (total_size / (1024**3)) / elapsed_time if elapsed_time > 0 else 0
108+
109+
print(
110+
f"WRITE: Data Size={(total_size / (1024 ** 3)):.4f} GB, Time={elapsed_time:.4f} s, "
111+
f"Speed={throughput_gbps:.4f} GB/s"
112+
)
113+
114+
return total_size, elapsed_time, throughput_gbps
115+
116+
117+
def fetch(
118+
store: UcmKVStoreBase,
119+
hashes: List[str],
120+
kvcaches: Dict[int, torch.Tensor],
121+
mla: bool,
122+
):
123+
start_time = time.perf_counter()
124+
125+
founds = store.lookup(hashes)
126+
for f in founds:
127+
assert f, "Cache block miss detected"
128+
129+
block_ids, offsets, tensors = [], [], []
130+
total_size = 0
131+
132+
for i, hash_val in enumerate(hashes):
133+
offset = 0
134+
for layer_id, kv_layer in kvcaches.items():
135+
k_tensor = kv_layer[0][i] # kv=1
136+
block_ids.append(hash_val)
137+
offsets.append(offset)
138+
tensors.append(k_tensor)
139+
sz = k_tensor.numel() * k_tensor.element_size()
140+
offset += sz
141+
total_size += sz
142+
143+
if not mla:
144+
v_tensor = kv_layer[1][i]
145+
block_ids.append(hash_val)
146+
offsets.append(offset)
147+
tensors.append(v_tensor)
148+
sz = v_tensor.numel() * v_tensor.element_size()
149+
offset += sz
150+
total_size += sz
151+
152+
task = store.load(block_ids, offsets, tensors)
153+
ret = store.wait(task)
154+
assert ret == 0, "Load operation failed"
155+
156+
elapsed_time = time.perf_counter() - start_time
157+
throughput_gbps = (total_size / (1024**3)) / elapsed_time if elapsed_time > 0 else 0
158+
159+
print(
160+
f"READ: Data Size={(total_size / (1024 ** 3)):.4f} GB, Time={elapsed_time:.4f} s, "
161+
f"Speed={throughput_gbps:.4f} GB/s"
162+
)
163+
164+
return total_size, elapsed_time, throughput_gbps
165+
166+
167+
def run(
168+
storage_backends: str,
169+
device_id: int,
170+
repeat: int,
171+
num_head: int,
172+
block_len: int,
173+
transferStreamNumber: int,
174+
num_tokens: int,
175+
block_layer: int,
176+
head_size: int,
177+
block_elem_size: int,
178+
kv: int,
179+
mla: bool,
180+
) -> Tuple[float, float, float, float, float, float]:
181+
"""
182+
Run a single test with given parameters and return performance metrics.
183+
184+
Returns:
185+
Tuple of (avg_w_size, avg_w_time, avg_w_bw, avg_r_time, avg_r_bw, avg_r_size)
186+
"""
187+
188+
block_dim = head_size * num_head
189+
io_size = block_dim * block_len * block_elem_size
190+
block_size = io_size * block_layer
191+
batch_size = int(num_tokens / block_len)
192+
real_blocks = batch_size + 10
193+
194+
w_bw_list, r_bw_list = [], []
195+
w_time_list, r_time_list = [], []
196+
w_size_sum, r_size_sum = 0.0, 0.0
197+
198+
store = setup(
199+
storage_backends, block_size, device_id, io_size, transferStreamNumber
200+
)
201+
for r in range(repeat):
202+
print(f"\n--- Round {r+1} ---")
203+
204+
hashes, kvcaches = make_buffers(
205+
real_blocks,
206+
device_id,
207+
batch_size,
208+
head_size,
209+
block_len,
210+
block_layer,
211+
num_head,
212+
kv,
213+
)
214+
215+
results = store.create(hashes[:batch_size])
216+
assert sum(results) == 0, "Create operation failed"
217+
218+
w_size, w_time, w_bw = embed(
219+
store,
220+
hashes[:batch_size],
221+
kvcaches,
222+
mla,
223+
)
224+
store.commit(hashes[:batch_size], True)
225+
226+
store_all_hashes(hashes[:batch_size])
227+
228+
r_size, r_time, r_bw = fetch(
229+
store,
230+
hashes[:batch_size],
231+
kvcaches,
232+
mla,
233+
)
234+
235+
w_bw_list.append(w_bw)
236+
r_bw_list.append(r_bw)
237+
w_time_list.append(w_time)
238+
r_time_list.append(r_time)
239+
w_size_sum += w_size
240+
r_size_sum += r_size
241+
242+
# Clean up resources
243+
del kvcaches, hashes
244+
torch.cuda.empty_cache()
245+
246+
del store
247+
avg_w_bw = sum(w_bw_list) / repeat
248+
avg_r_bw = sum(r_bw_list) / repeat
249+
avg_w_time = sum(w_time_list) / repeat
250+
avg_r_time = sum(r_time_list) / repeat
251+
avg_w_size = w_size_sum / (1024**3) / repeat
252+
avg_r_size = r_size_sum / (1024**3) / repeat
253+
254+
return avg_w_size, avg_w_time, avg_w_bw, avg_r_time, avg_r_bw, avg_r_size
255+
256+
257+
if __name__ == "__main__":
258+
os.environ["UC_LOGGER_LEVEL"] = "debug"
259+
260+
try:
261+
result = run(
262+
storage_backends="/home/nfs/zht_data",
263+
device_id=1,
264+
repeat=1,
265+
num_head=1,
266+
block_len=128,
267+
transferStreamNumber=32,
268+
num_tokens=4096,
269+
block_layer=61,
270+
head_size=576,
271+
block_elem_size=2,
272+
kv=1,
273+
mla=True,
274+
)
275+
276+
avg_w_size, avg_w_time, avg_w_bw, avg_r_time, avg_r_bw, avg_r_size = result
277+
278+
except Exception as e:
279+
print(f"Error: {e}")
280+
import traceback
281+
282+
traceback.print_exc()

0 commit comments

Comments
 (0)