Skip to content

Commit f1337f8

Browse files
committed
GDS
1 parent 8362969 commit f1337f8

File tree

16 files changed

+562
-12
lines changed

16 files changed

+562
-12
lines changed

ucm/store/device/cuda/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ add_library(Cuda::cudart UNKNOWN IMPORTED)
33
set_target_properties(Cuda::cudart PROPERTIES
44
INTERFACE_INCLUDE_DIRECTORIES "${CUDA_ROOT}/include"
55
IMPORTED_LOCATION "${CUDA_ROOT}/lib64/libcudart.so"
6+
IMPORTED_LOCATION "${CUDA_ROOT}/lib64/libcufile.so"
67
)
78

89
add_library(storedevice STATIC cuda_device.cc)

ucm/store/device/cuda/cuda_device.cc

Lines changed: 92 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,13 @@
2424
#include <cuda_runtime.h>
2525
#include "ibuffered_device.h"
2626
#include "logger/logger.h"
27+
#include <cufile.h>
28+
#include <mutex>
29+
#include <fcntl.h>
30+
#include <unistd.h>
31+
#include <cerrno>
32+
#include <cstring>
33+
#include "sharded_handle_recorder.h"
2734

2835
template <>
2936
struct fmt::formatter<cudaError_t> : formatter<int32_t> {
@@ -35,6 +42,28 @@ struct fmt::formatter<cudaError_t> : formatter<int32_t> {
3542

3643
namespace UC {
3744

45+
static Status CreateCuFileHandle(const std::string& path, int flags, CUfileHandle_t& cuFileHandle, int& fd)
46+
{
47+
fd = open(path.c_str(), flags, 0644);
48+
if (fd < 0) {
49+
UC_ERROR("Failed to open file {}: {}", path, strerror(errno));
50+
return Status::Error();
51+
}
52+
53+
CUfileDescr_t cfDescr{};
54+
cfDescr.handle.fd = fd;
55+
cfDescr.type = CU_FILE_HANDLE_TYPE_OPAQUE_FD;
56+
CUfileError_t err = cuFileHandleRegister(&cuFileHandle, &cfDescr);
57+
if (err.err != CU_FILE_SUCCESS) {
58+
UC_ERROR("Failed to register cuFile handle for {}: error {}",
59+
path, static_cast<int>(err.err));
60+
close(fd);
61+
fd = -1;
62+
return Status::Error();
63+
}
64+
65+
return Status::OK();
66+
}
3867
template <typename Api, typename... Args>
3968
Status CudaApi(const char* caller, const char* file, const size_t line, const char* name, Api&& api,
4069
Args&&... args)
@@ -62,17 +91,32 @@ class CudaDevice : public IBufferedDevice {
6291
c->cb(ret == cudaSuccess);
6392
delete c;
6493
}
94+
static std::once_flag gdsOnce_;
95+
static void InitGdsOnce();
6596

6697
public:
6798
CudaDevice(const int32_t deviceId, const size_t bufferSize, const size_t bufferNumber)
6899
: IBufferedDevice{deviceId, bufferSize, bufferNumber}, stream_{nullptr}
69100
{
70101
}
71-
Status Setup() override
102+
~CudaDevice() {
103+
CuFileHandleRecorder::Instance().ClearAll([](CUfileHandle_t h, int fd) {
104+
cuFileHandleDeregister(h);
105+
if (fd >= 0) {
106+
close(fd);
107+
}
108+
});
109+
110+
if (stream_ != nullptr) {
111+
cudaStreamDestroy((cudaStream_t)stream_);
112+
}
113+
}
114+
Status Setup(bool transferUseDirect) override
72115
{
116+
if(transferUseDirect) {InitGdsOnce();}
73117
auto status = Status::OK();
74118
if ((status = CUDA_API(cudaSetDevice, this->deviceId)).Failure()) { return status; }
75-
if ((status = IBufferedDevice::Setup()).Failure()) { return status; }
119+
if ((status = IBufferedDevice::Setup(transferUseDirect)).Failure()) { return status; }
76120
if ((status = CUDA_API(cudaStreamCreate, (cudaStream_t*)&this->stream_)).Failure()) {
77121
return status;
78122
}
@@ -96,6 +140,40 @@ class CudaDevice : public IBufferedDevice {
96140
return CUDA_API(cudaMemcpyAsync, dst, src, count, cudaMemcpyDeviceToHost,
97141
(cudaStream_t)this->stream_);
98142
}
143+
Status S2DSync(const std::string& path, void* address, const size_t length, const size_t file_offset, const size_t dev_offset) override
144+
{
145+
CUfileHandle_t cuFileHandle = nullptr;
146+
auto status = CuFileHandleRecorder::Instance().Get(path, cuFileHandle,
147+
[&path](CUfileHandle_t& handle, int& fd) -> Status {
148+
return CreateCuFileHandle(path, O_RDONLY | O_DIRECT, handle, fd);
149+
});
150+
if (status.Failure()) {
151+
return status;
152+
}
153+
ssize_t bytesRead = cuFileRead(cuFileHandle, address, length, file_offset, dev_offset);
154+
if (bytesRead < 0 || (size_t)bytesRead != length) {
155+
UC_ERROR("cuFileRead failed for {}: expected {}, got {}", path, length, bytesRead);
156+
return Status::Error();
157+
}
158+
return Status::OK();
159+
}
160+
Status D2SSync(const std::string& path, void* address, const size_t length, const size_t file_offset, const size_t dev_offset) override
161+
{
162+
CUfileHandle_t cuFileHandle = nullptr;
163+
auto status = CuFileHandleRecorder::Instance().Get(path, cuFileHandle,
164+
[&path](CUfileHandle_t& handle, int& fd) -> Status {
165+
return CreateCuFileHandle(path, O_WRONLY | O_CREAT | O_DIRECT, handle, fd);
166+
});
167+
if (status.Failure()) {
168+
return status;
169+
}
170+
ssize_t bytesWrite = cuFileWrite(cuFileHandle, address, length, file_offset, dev_offset);
171+
if (bytesWrite < 0 || (size_t)bytesWrite != length) {
172+
UC_ERROR("cuFileWrite failed for {}: expected {}, got {}", path, length, bytesWrite);
173+
return Status::Error();
174+
}
175+
return Status::OK();
176+
}
99177
Status AppendCallback(std::function<void(bool)> cb) override
100178
{
101179
auto* c = new (std::nothrow) Closure(cb);
@@ -140,5 +218,17 @@ std::unique_ptr<IDevice> DeviceFactory::Make(const int32_t deviceId, const size_
140218
return nullptr;
141219
}
142220
}
221+
std::once_flag CudaDevice::gdsOnce_{};
222+
void CudaDevice::InitGdsOnce()
223+
{
224+
std::call_once(gdsOnce_, [] (){
225+
CUfileError_t ret = cuFileDriverOpen();
226+
if (ret.err == CU_FILE_SUCCESS) {
227+
UC_INFO("GDS driver initialized successfully");
228+
} else {
229+
UC_ERROR("GDS driver initialized unsuccessfully");
230+
}
231+
});
232+
}
143233

144234
} // namespace UC
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
#ifndef UC_INFRA_SHARDED_HANDLE_RECORDER_H
2+
#define UC_INFRA_SHARDED_HANDLE_RECORDER_H
3+
4+
#include <functional>
5+
#include <string>
6+
#include "status/status.h"
7+
#include <cufile.h>
8+
#include "infra/template/hashmap.h"
9+
10+
namespace UC {
11+
12+
class CuFileHandleRecorder {
13+
private:
14+
struct RecordValue {
15+
CUfileHandle_t handle;
16+
int fd;
17+
uint64_t refCount;
18+
};
19+
using HandleMap = HashMap<std::string, RecordValue, std::hash<std::string>, 10>;
20+
HandleMap handles_;
21+
CuFileHandleRecorder() = default;
22+
CuFileHandleRecorder(const CuFileHandleRecorder&) = delete;
23+
CuFileHandleRecorder& operator=(const CuFileHandleRecorder&) = delete;
24+
25+
public:
26+
static CuFileHandleRecorder& Instance()
27+
{
28+
static CuFileHandleRecorder recorder;
29+
return recorder;
30+
}
31+
32+
Status Get(const std::string& path, CUfileHandle_t& handle,
33+
std::function<Status(CUfileHandle_t&, int&)> instantiate)
34+
{
35+
auto result = handles_.GetOrCreate(path, [&instantiate](RecordValue& value) -> bool {
36+
int fd = -1;
37+
CUfileHandle_t h = nullptr;
38+
39+
auto status = instantiate(h, fd);
40+
if (status.Failure()) {
41+
return false;
42+
}
43+
44+
value.handle = h;
45+
value.fd = fd;
46+
value.refCount = 1;
47+
return true;
48+
});
49+
50+
if (!result.has_value()) {
51+
return Status::Error();
52+
}
53+
54+
auto& recordValue = result.value().get();
55+
recordValue.refCount++;
56+
handle = recordValue.handle;
57+
return Status::OK();
58+
}
59+
60+
void Put(const std::string& path,
61+
std::function<void(CUfileHandle_t)> cleanup)
62+
{
63+
handles_.Upsert(path, [&cleanup](RecordValue& value) -> bool {
64+
value.refCount--;
65+
if (value.refCount > 0) {
66+
return false;
67+
}
68+
cleanup(value.handle);
69+
return true;
70+
});
71+
}
72+
73+
void ClearAll(std::function<void(CUfileHandle_t, int)> cleanup)
74+
{
75+
handles_.ForEach([&cleanup](const std::string& path, RecordValue& value) {
76+
cleanup(value.handle, value.fd);
77+
});
78+
handles_.Clear();
79+
}
80+
};
81+
82+
} // namespace UC
83+
84+
#endif // UC_INFRA_SHARDED_HANDLE_RECORDER_H

ucm/store/device/ibuffered_device.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,9 @@ class IBufferedDevice : public IDevice {
3535
: IDevice{deviceId, bufferSize, bufferNumber}
3636
{
3737
}
38-
Status Setup() override
38+
Status Setup(bool transferUseDirect) override
3939
{
40+
if(transferUseDirect) {return Status::OK();}
4041
auto totalSize = this->bufferSize * this->bufferNumber;
4142
this->_addr = this->MakeBuffer(totalSize);
4243
if (!this->_addr) { return Status::OutOfMemory(); }

ucm/store/device/idevice.h

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,14 +37,20 @@ class IDevice {
3737
{
3838
}
3939
virtual ~IDevice() = default;
40-
virtual Status Setup() = 0;
40+
virtual Status Setup(bool transferUseDirect) = 0;
4141
virtual std::shared_ptr<std::byte> GetBuffer(const size_t size) = 0;
4242
virtual Status H2DSync(std::byte* dst, const std::byte* src, const size_t count) = 0;
4343
virtual Status D2HSync(std::byte* dst, const std::byte* src, const size_t count) = 0;
4444
virtual Status H2DAsync(std::byte* dst, const std::byte* src, const size_t count) = 0;
4545
virtual Status D2HAsync(std::byte* dst, const std::byte* src, const size_t count) = 0;
4646
virtual Status AppendCallback(std::function<void(bool)> cb) = 0;
4747
virtual Status Synchronized() = 0;
48+
virtual Status S2DSync(const std::string& path, void* address, const size_t length, const size_t file_offset, const size_t dev_offset) {
49+
return Status::Unsupported();
50+
}
51+
virtual Status D2SSync(const std::string& path, void* address, const size_t length, const size_t file_offset, const size_t dev_offset) {
52+
return Status::Unsupported();
53+
}
4854

4955
protected:
5056
virtual std::shared_ptr<std::byte> MakeBuffer(const size_t size) = 0;

0 commit comments

Comments
 (0)