Skip to content

Commit 9689f99

Browse files
author
Omar Marzouk
committed
Asynchronous read ahed
1 parent 0858a52 commit 9689f99

File tree

3 files changed

+41
-60
lines changed

3 files changed

+41
-60
lines changed

tensorflow_io/core/filesystems/dfs/dfs_filesystem.cc

Lines changed: 7 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ typedef struct DFSRandomAccessFile {
2020
daos_file.file = obj;
2121
dfs_get_size(daos_fs, obj, &file_size);
2222
for(size_t i = 0; i < num_of_buffers; i++) {
23-
buffers.push_back(ReadBuffer(eqh, BUFF_SIZE));
23+
buffers.push_back(ReadBuffer(i, eqh, BUFF_SIZE));
2424
}
2525
}
2626
} DFSRandomAccessFile;
@@ -39,47 +39,20 @@ int64_t Read(const TF_RandomAccessFile* file, uint64_t offset, size_t n,
3939
char* ret, TF_Status* status) {
4040
int rc = 0;
4141
auto dfs_file = static_cast<DFSRandomAccessFile*>(file->plugin_file);
42-
int counter = 0;
4342
for(auto& read_buf: dfs_file->buffers) {
44-
if(read_buf.CacheHit(offset, n)){
45-
rc = read_buf.CopyData(ret, offset, n);
46-
if (rc) {
47-
TF_SetStatus(status, TF_INTERNAL, "");
48-
return 0;
49-
}
50-
51-
if (offset + n > dfs_file->file_size) {
52-
TF_SetStatus(status, TF_OUT_OF_RANGE, "");
53-
return dfs_file->file_size - offset;
54-
}
55-
56-
TF_SetStatus(status, TF_OK, "");
57-
return n;
58-
}
59-
counter++;
43+
if(read_buf.CacheHit(offset, n))
44+
return read_buf.CopyFromCache(ret, offset, n, dfs_file->file_size, status);
6045
}
6146

6247
dfs_file->buffers[0].ReadSync(dfs_file->daos_fs, dfs_file->daos_file.file, offset);
63-
rc = dfs_file->buffers[0].CopyData(ret, offset, n);
6448
size_t curr_offset = offset + BUFF_SIZE;
6549
for(size_t i = 1; i < dfs_file->buffers.size(); i++) {
6650
if(curr_offset > dfs_file->file_size) break;
67-
dfs_file->buffers[i].ReadSync(dfs_file->daos_fs, dfs_file->daos_file.file, curr_offset);
51+
dfs_file->buffers[i].ReadAsync(dfs_file->daos_fs, dfs_file->daos_file.file, curr_offset);
6852
curr_offset += BUFF_SIZE;
6953
}
7054

71-
if (rc) {
72-
TF_SetStatus(status, TF_INTERNAL, "");
73-
return 0;
74-
}
75-
76-
if (offset + n > dfs_file->file_size) {
77-
TF_SetStatus(status, TF_OUT_OF_RANGE, "");
78-
return dfs_file->file_size - offset;
79-
}
80-
81-
TF_SetStatus(status, TF_OK, "");
82-
return n;
55+
return dfs_file->buffers[0].CopyFromCache(ret, offset, n, dfs_file->file_size, status);
8356
}
8457

8558
} // namespace tf_random_access_file
@@ -216,7 +189,7 @@ void NewRandomAccessFile(const TF_Filesystem* filesystem, const char* path,
216189
}
217190
auto random_access_file = new tf_random_access_file::DFSRandomAccessFile(path, daos->daos_fs, daos->mEventQueueHandle,
218191
obj, NUM_OF_BUFFERS);
219-
random_access_file->buffers[0].ReadSync(daos->daos_fs, random_access_file->daos_file.file, 0);
192+
random_access_file->buffers[0].ReadAsync(daos->daos_fs, random_access_file->daos_file.file, 0);
220193
file->plugin_file = random_access_file;
221194
TF_SetStatus(status, TF_OK, "");
222195
}
@@ -695,4 +668,4 @@ void ProvideFilesystemSupportFor(TF_FilesystemPluginOps* ops, const char* uri) {
695668

696669
} // namespace dfs
697670
} // namespace io
698-
} // namespace tensorflow
671+
} // namespace tensorflow

tensorflow_io/core/filesystems/dfs/dfs_utils.cc

Lines changed: 27 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ void DFS::Teardown() {
113113
do {
114114
ret = daos_eq_poll(mEventQueueHandle, 1, -1, 1, &(temp_event));
115115
} while(ret == 1);
116-
daos_eq_destroy(mEventQueueHandle, 0);
116+
int rc = daos_eq_destroy(mEventQueueHandle, 0);
117117
Unmount();
118118
ClearConnections();
119119
}
@@ -425,19 +425,22 @@ int DFS::DisconnectContainer(std::string pool_string, std::string cont_string) {
425425
return rc;
426426
}
427427

428-
ReadBuffer::ReadBuffer(daos_handle_t eqh, size_t size): buffer_size(size), eqh(eqh) {
428+
ReadBuffer::ReadBuffer(size_t id, daos_handle_t eqh, size_t size): id(id), buffer_size(size), eqh(eqh) {
429429
buffer = new char[size];
430430
buffer_offset = 0;
431431
event = new daos_event_t;
432-
daos_event_init(event, eqh,nullptr);
432+
int rc = daos_event_init(event, eqh,nullptr);
433+
if(rc) std::cout << "Failed to init" << std::endl;
433434
valid = false;
434435
}
435436

436437
ReadBuffer::~ReadBuffer() {
437438
if(event != nullptr) {
438439
bool event_status;
439-
daos_event_test(event, 0, &event_status);
440-
daos_event_fini(event);
440+
int rc = daos_event_test(event, 0, &event_status);
441+
if(rc) std::cout << "Failed to Test while destroying" << std::endl;
442+
rc = daos_event_fini(event);
443+
if(rc) std::cout << "Failed to Finalize" << std::endl;
441444
}
442445
delete [] buffer;
443446
delete event;
@@ -449,22 +452,25 @@ ReadBuffer::ReadBuffer(ReadBuffer&& read_buffer) {
449452
buffer = std::move(read_buffer.buffer);
450453
event = std::move(read_buffer.event);
451454
buffer_offset = 0;
455+
id = read_buffer.id;
452456
valid = false;
453457
read_buffer.buffer = nullptr;
454458
read_buffer.event = nullptr;
455459
}
456460

457461
bool
458-
ReadBuffer::CacheHit(size_t pos, size_t len) {
459-
return pos >= buffer_offset && len < buffer_size && (pos+len <= buffer_offset + buffer_size);
462+
ReadBuffer::CacheHit(const size_t pos, const size_t len) {
463+
return pos >= buffer_offset && len <= buffer_size && (pos+len <= buffer_offset + buffer_size);
460464
}
461465

462466
int
463467
ReadBuffer::WaitEvent() {
464-
if(valid) return 0;
468+
if(valid) return 0;
465469
bool event_status;
466-
daos_event_test(event, -1, &event_status);
470+
int rc = daos_event_test(event, -1, &event_status);
471+
if(rc) std::cout << "Failed to Wait" << std::endl;
467472
if(event_status) {
473+
valid = true;
468474
return 0;
469475
}
470476
return -1;
@@ -473,39 +479,38 @@ ReadBuffer::WaitEvent() {
473479
int
474480
ReadBuffer::AbortEvent() {
475481
bool event_status = false;
476-
daos_event_test(event, 0, &event_status);
482+
int rc = daos_event_test(event, 0, &event_status);
483+
if(rc) std::cout << "Failed to check event status" << std::endl;
477484
if(!event_status)
478485
return daos_event_abort(event);
479486
else
480487
return 0;
481488
}
482489

483490
int
484-
ReadBuffer::ReadAsync(dfs_t* daos_fs, dfs_obj_t* file, size_t off) {
491+
ReadBuffer::ReadAsync(dfs_t* daos_fs, dfs_obj_t* file, const size_t off) {
485492
int rc = AbortEvent();
486493
if(rc) return rc;
487494
d_sg_list_t rsgl;
488495
d_iov_t iov;
489496
d_iov_set(&iov, (void*)buffer, buffer_size);
490497
rsgl.sg_nr = 1;
491498
rsgl.sg_iovs = &iov;
492-
daos_size_t read_size;
493499
valid = false;
494500
buffer_offset = off;
495501
dfs_read(daos_fs, file, &rsgl,
496502
buffer_offset, &read_size, event);
497503
}
498504

499505
int
500-
ReadBuffer::ReadSync(dfs_t* daos_fs, dfs_obj_t* file, size_t off) {
506+
ReadBuffer::ReadSync(dfs_t* daos_fs, dfs_obj_t* file, const size_t off) {
501507
int rc = AbortEvent();
502508
if(rc) return rc;
503509
d_sg_list_t rsgl;
504510
d_iov_t iov;
505511
d_iov_set(&iov, (void*)buffer, buffer_size);
506512
rsgl.sg_nr = 1;
507513
rsgl.sg_iovs = &iov;
508-
daos_size_t read_size;
509514
valid = false;
510515
buffer_offset = off;
511516
rc = dfs_read(daos_fs, file, &rsgl,
@@ -515,27 +520,29 @@ ReadBuffer::ReadSync(dfs_t* daos_fs, dfs_obj_t* file, size_t off) {
515520
}
516521

517522
int
518-
ReadBuffer::CopyData(char* ret, size_t off, size_t n) {
523+
ReadBuffer::CopyData(char* ret, const size_t off, const size_t n) {
519524
int rc = WaitEvent();
520525
if(rc) return rc;
521-
memcpy(ret, buffer + (off - buffer_offset), n);
526+
memcpy(ret, buffer + (off - buffer_offset), n);
522527
return 0;
523528
}
524529

525530
int
526-
ReadBuffer::CopyFromCache(char* ret, size_t off, size_t n, daos_size_t file_size, TF_Status* status){
527-
int rc = CopyData(ret, off, n);
531+
ReadBuffer::CopyFromCache(char* ret, const size_t off, const size_t n, const daos_size_t file_size, TF_Status* status){
532+
size_t read_size;
533+
read_size = off + n > file_size? file_size - off : n;
534+
int rc = CopyData(ret, off, read_size);
528535
if (rc) {
529536
TF_SetStatus(status, TF_INTERNAL, "");
530537
return 0;
531538
}
532539

533540
if (off + n > file_size) {
534541
TF_SetStatus(status, TF_OUT_OF_RANGE, "");
535-
return file_size - off;
542+
return read_size;
536543
}
537544

538545
TF_SetStatus(status, TF_OK, "");
539-
return n;
546+
return read_size;
540547
}
541548

tensorflow_io/core/filesystems/dfs/dfs_utils.h

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -206,14 +206,14 @@ void CopyEntries(char*** entries, std::vector<std::string>& results);
206206

207207
class ReadBuffer {
208208
public:
209-
ReadBuffer(daos_handle_t eqh, size_t size);
209+
ReadBuffer(size_t id, daos_handle_t eqh, size_t size);
210210

211211
ReadBuffer(ReadBuffer&&);
212212

213213
~ReadBuffer();
214214

215215
bool
216-
CacheHit(size_t pos, size_t off);
216+
CacheHit(const size_t pos, const size_t off);
217217

218218
int
219219
WaitEvent();
@@ -222,18 +222,19 @@ class ReadBuffer {
222222
AbortEvent();
223223

224224
int
225-
ReadAsync(dfs_t* dfs, dfs_obj_t* file, size_t off);
225+
ReadAsync(dfs_t* dfs, dfs_obj_t* file, const size_t off);
226226

227227
int
228-
ReadSync(dfs_t* dfs, dfs_obj_t* file, size_t off);
228+
ReadSync(dfs_t* dfs, dfs_obj_t* file, const size_t off);
229229

230230
int
231-
CopyData(char* ret, size_t off, size_t n);
231+
CopyData(char* ret, const size_t offset, const size_t n);
232232

233233
int
234-
CopyFromCache(char* ret, size_t off, size_t n, daos_size_t file_size, TF_Status* status);
234+
CopyFromCache(char* ret, const size_t off, const size_t n, const daos_size_t file_size, TF_Status* status);
235235

236236
private:
237+
size_t id;
237238
char* buffer;
238239
size_t buffer_offset;
239240
size_t buffer_size;

0 commit comments

Comments
 (0)