Skip to content

Commit 7488654

Browse files
author
Omar Marzouk
committed
Finalize Read Ahead
1 parent db74d80 commit 7488654

File tree

3 files changed

+84
-34
lines changed

3 files changed

+84
-34
lines changed

tensorflow_io/core/filesystems/dfs/dfs_filesystem.cc

Lines changed: 23 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,31 @@ typedef struct DFSRandomAccessFile {
1414
std::vector<ReadBuffer> buffers;
1515
daos_size_t file_size;
1616
DFSRandomAccessFile(std::string dfs_path, dfs_t* file_system, daos_handle_t eqh,
17-
dfs_obj_t* obj, size_t num_of_buffers)
17+
dfs_obj_t* obj)
1818
: dfs_path(std::move(dfs_path)) {
1919
daos_fs = file_system;
2020
daos_file.file = obj;
2121
dfs_get_size(daos_fs, obj, &file_size);
22+
size_t num_of_buffers;
23+
size_t buff_size;
24+
25+
if(char* env_num_of_buffers = std::getenv("TF_IO_DAOS_NUM_OF_BUFFERS")) {
26+
num_of_buffers = atoi(env_num_of_buffers);
27+
}
28+
else {
29+
num_of_buffers = NUM_OF_BUFFERS;
30+
}
31+
32+
if(char* env_buff_size = std::getenv("TF_IO_DAOS_BUFFER_SIZE")) {
33+
buff_size = GetStorageSize(env_buff_size);
34+
}
35+
else {
36+
buff_size = BUFF_SIZE;
37+
}
38+
std::cout << buff_size / 1024 / 1024 << std::endl;
39+
std::cout << num_of_buffers << std::endl;
2240
for(size_t i = 0; i < num_of_buffers; i++) {
23-
buffers.push_back(ReadBuffer(i, eqh, BUFF_SIZE));
41+
buffers.push_back(ReadBuffer(i, eqh, buff_size));
2442
}
2543
}
2644
} DFSRandomAccessFile;
@@ -37,21 +55,21 @@ void Cleanup(TF_RandomAccessFile* file) {
3755

3856
int64_t Read(const TF_RandomAccessFile* file, uint64_t offset, size_t n,
3957
char* ret, TF_Status* status) {
40-
int rc = 0;
4158
auto dfs_file = static_cast<DFSRandomAccessFile*>(file->plugin_file);
4259
for(auto& read_buf: dfs_file->buffers) {
4360
if(read_buf.CacheHit(offset, n))
4461
return read_buf.CopyFromCache(ret, offset, n, dfs_file->file_size, status);
4562
}
4663

47-
dfs_file->buffers[0].ReadSync(dfs_file->daos_fs, dfs_file->daos_file.file, offset);
4864
size_t curr_offset = offset + BUFF_SIZE;
4965
for(size_t i = 1; i < dfs_file->buffers.size(); i++) {
5066
if(curr_offset > dfs_file->file_size) break;
5167
dfs_file->buffers[i].ReadAsync(dfs_file->daos_fs, dfs_file->daos_file.file, curr_offset);
5268
curr_offset += BUFF_SIZE;
5369
}
5470

71+
dfs_file->buffers[0].ReadSync(dfs_file->daos_fs, dfs_file->daos_file.file, offset);
72+
5573
return dfs_file->buffers[0].CopyFromCache(ret, offset, n, dfs_file->file_size, status);
5674
}
5775

@@ -157,7 +175,6 @@ void NewFile(const TF_Filesystem* filesystem, const char* path, mode_t mode,
157175
std::string pool, cont, file_path;
158176
rc = daos->Setup(path, pool, cont, file_path, status);
159177
if (rc) return;
160-
161178
daos->dfsNewFile(file_path, mode, flags, obj, status);
162179
}
163180

@@ -188,7 +205,7 @@ void NewRandomAccessFile(const TF_Filesystem* filesystem, const char* path,
188205
return;
189206
}
190207
auto random_access_file = new tf_random_access_file::DFSRandomAccessFile(path, daos->daos_fs, daos->mEventQueueHandle,
191-
obj, NUM_OF_BUFFERS);
208+
obj);
192209
random_access_file->buffers[0].ReadAsync(daos->daos_fs, random_access_file->daos_file.file, 0);
193210
file->plugin_file = random_access_file;
194211
TF_SetStatus(status, TF_OK, "");

tensorflow_io/core/filesystems/dfs/dfs_utils.cc

Lines changed: 54 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
#include "tensorflow_io/core/filesystems/dfs/dfs_utils.h"
22

3-
std::string FormatStorageSize(uint64_t size) {
3+
std::string GetStorageString(uint64_t size) {
44
if (size < KILO) {
55
return std::to_string(size);
66
} else if (size < MEGA) {
@@ -14,18 +14,55 @@ std::string FormatStorageSize(uint64_t size) {
1414
}
1515
}
1616

17+
size_t GetStorageSize(std::string size) {
18+
char size_char = size.back();
19+
size_t curr_scale = 1;
20+
switch (size_char)
21+
{
22+
case 'K':
23+
size.pop_back();
24+
curr_scale *= 1024;
25+
return (size_t) atoi(size.c_str()) * curr_scale;
26+
break;
27+
case 'M':
28+
size.pop_back();
29+
curr_scale *= 1024 * 1024;
30+
return (size_t) atoi(size.c_str()) * curr_scale;
31+
break;
32+
case 'G':
33+
size.pop_back();
34+
curr_scale *= 1024 * 1024 * 1024;
35+
return (size_t) atoi(size.c_str()) * curr_scale;
36+
break;
37+
case 'T':
38+
size.pop_back();
39+
curr_scale *= 1024 * 1024 * 1024;
40+
return (size_t) atoi(size.c_str()) * curr_scale * 1024;
41+
break;
42+
default:
43+
return atoi(size.c_str());
44+
break;
45+
}
46+
}
47+
1748
int ParseDFSPath(const std::string& path, std::string& pool_string,
1849
std::string& cont_string, std::string& filename) {
1950
size_t pool_start = path.find("://") + 3;
2051
struct duns_attr_t* attr =
2152
(struct duns_attr_t*)malloc(sizeof(struct duns_attr_t));
2253
attr->da_rel_path = NULL;
23-
attr->da_flags = 0;
24-
attr->da_no_prefix = false;
25-
std::string direct_path = "daos://" + path.substr(pool_start);
54+
attr->da_flags = 1;
55+
attr->da_no_prefix = true;
56+
std::string direct_path = "/" + path.substr(pool_start);
2657
int rc = duns_resolve_path(direct_path.c_str(), attr);
27-
if (rc) {
28-
return rc;
58+
if (rc == 2) {
59+
attr->da_rel_path = NULL;
60+
attr->da_flags = 0;
61+
attr->da_no_prefix = false;
62+
direct_path = "daos://" + path.substr(pool_start);
63+
rc = duns_resolve_path(direct_path.c_str(), attr);
64+
if(rc)
65+
return rc;
2966
}
3067
pool_string = attr->da_pool;
3168
cont_string = attr->da_cont;
@@ -113,7 +150,7 @@ void DFS::Teardown() {
113150
do {
114151
ret = daos_eq_poll(mEventQueueHandle, 1, -1, 1, &(temp_event));
115152
} while(ret == 1);
116-
int rc = daos_eq_destroy(mEventQueueHandle, 0);
153+
daos_eq_destroy(mEventQueueHandle, 0);
117154
Unmount();
118155
ClearConnections();
119156
}
@@ -198,15 +235,15 @@ int DFS::Query() {
198235
<< std::endl;
199236
std::cout << "- SCM:" << std::endl;
200237
std::cout << " Total size: "
201-
<< FormatStorageSize(pool_info.pi_space.ps_space.s_total[0]);
238+
<< GetStorageString(pool_info.pi_space.ps_space.s_total[0]);
202239
std::cout << " Free: "
203-
<< FormatStorageSize(pool_info.pi_space.ps_space.s_free[0])
240+
<< GetStorageString(pool_info.pi_space.ps_space.s_free[0])
204241
<< std::endl;
205242
std::cout << "- NVMe:" << std::endl;
206243
std::cout << " Total size: "
207-
<< FormatStorageSize(pool_info.pi_space.ps_space.s_total[1]);
244+
<< GetStorageString(pool_info.pi_space.ps_space.s_total[1]);
208245
std::cout << " Free: "
209-
<< FormatStorageSize(pool_info.pi_space.ps_space.s_free[1])
246+
<< GetStorageString(pool_info.pi_space.ps_space.s_free[1])
210247
<< std::endl;
211248
std::cout << std::endl
212249
<< "Connected Container: " << container.first << std::endl;
@@ -429,18 +466,15 @@ ReadBuffer::ReadBuffer(size_t id, daos_handle_t eqh, size_t size): id(id), buffe
429466
buffer = new char[size];
430467
buffer_offset = 0;
431468
event = new daos_event_t;
432-
int rc = daos_event_init(event, eqh,nullptr);
433-
if(rc) std::cout << "Failed to init" << std::endl;
469+
daos_event_init(event, eqh,nullptr);
434470
valid = false;
435471
}
436472

437473
ReadBuffer::~ReadBuffer() {
438474
if(event != nullptr) {
439475
bool event_status;
440-
int rc = daos_event_test(event, 0, &event_status);
441-
if(rc) std::cout << "Failed to Test while destroying" << std::endl;
442-
rc = daos_event_fini(event);
443-
if(rc) std::cout << "Failed to Finalize" << std::endl;
476+
daos_event_test(event, 0, &event_status);
477+
daos_event_fini(event);
444478
}
445479
delete [] buffer;
446480
delete event;
@@ -467,8 +501,7 @@ int
467501
ReadBuffer::WaitEvent() {
468502
if(valid) return 0;
469503
bool event_status;
470-
int rc = daos_event_test(event, -1, &event_status);
471-
if(rc) std::cout << "Failed to Wait" << std::endl;
504+
daos_event_test(event, -1, &event_status);
472505
if(event_status) {
473506
valid = true;
474507
return 0;
@@ -479,8 +512,7 @@ ReadBuffer::WaitEvent() {
479512
int
480513
ReadBuffer::AbortEvent() {
481514
bool event_status = false;
482-
int rc = daos_event_test(event, 0, &event_status);
483-
if(rc) std::cout << "Failed to check event status" << std::endl;
515+
daos_event_test(event, 0, &event_status);
484516
if(!event_status)
485517
return daos_event_abort(event);
486518
else
@@ -491,23 +523,20 @@ int
491523
ReadBuffer::ReadAsync(dfs_t* daos_fs, dfs_obj_t* file, const size_t off) {
492524
int rc = AbortEvent();
493525
if(rc) return rc;
494-
d_sg_list_t rsgl;
495-
d_iov_t iov;
496526
d_iov_set(&iov, (void*)buffer, buffer_size);
497527
rsgl.sg_nr = 1;
498528
rsgl.sg_iovs = &iov;
499529
valid = false;
500530
buffer_offset = off;
501531
dfs_read(daos_fs, file, &rsgl,
502532
buffer_offset, &read_size, event);
533+
return 0;
503534
}
504535

505536
int
506537
ReadBuffer::ReadSync(dfs_t* daos_fs, dfs_obj_t* file, const size_t off) {
507538
int rc = AbortEvent();
508539
if(rc) return rc;
509-
d_sg_list_t rsgl;
510-
d_iov_t iov;
511540
d_iov_set(&iov, (void*)buffer, buffer_size);
512541
rsgl.sg_nr = 1;
513542
rsgl.sg_iovs = &iov;

tensorflow_io/core/filesystems/dfs/dfs_utils.h

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,8 @@
99
#define CONT_START 43
1010
#define PATH_START 80
1111
#define STACK 24
12-
#define NUM_OF_BUFFERS 2
13-
#define BUFF_SIZE 10*1024*1024
12+
#define NUM_OF_BUFFERS 256
13+
#define BUFF_SIZE 4*1024*1024
1414

1515
#include <daos.h>
1616
#include <daos_fs.h>
@@ -130,7 +130,9 @@ typedef std::pair<std::string, daos_handle_t> id_handle_t;
130130
// enum used while wildcard matching
131131
enum Children_Status { NON_MATCHING, MATCHING_DIR, OK };
132132

133-
std::string FormatStorageSize(uint64_t size);
133+
std::string GetStorageString(uint64_t size);
134+
135+
size_t GetStorageSize(std::string size);
134136

135137
// parse DFS path in the format of dfs://<pool_uuid>/<cont_uuid>/<filename>
136138
int ParseDFSPath(const std::string& path, std::string& pool_string,
@@ -240,6 +242,8 @@ class ReadBuffer {
240242
size_t buffer_size;
241243
daos_handle_t eqh;
242244
daos_event_t* event;
245+
d_sg_list_t rsgl;
246+
d_iov_t iov;
243247
bool valid;
244248
daos_size_t read_size;
245249
};

0 commit comments

Comments
 (0)