Skip to content

Commit dbf0bce

Browse files
author
Omar Marzouk
committed
Linting
1 parent 7488654 commit dbf0bce

File tree

3 files changed

+123
-138
lines changed

3 files changed

+123
-138
lines changed

tensorflow_io/core/filesystems/dfs/dfs_filesystem.cc

100755100644
Lines changed: 25 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -13,39 +13,37 @@ typedef struct DFSRandomAccessFile {
1313
DAOS_FILE daos_file;
1414
std::vector<ReadBuffer> buffers;
1515
daos_size_t file_size;
16-
DFSRandomAccessFile(std::string dfs_path, dfs_t* file_system, daos_handle_t eqh,
17-
dfs_obj_t* obj)
16+
DFSRandomAccessFile(std::string dfs_path, dfs_t* file_system,
17+
daos_handle_t eqh, dfs_obj_t* obj)
1818
: dfs_path(std::move(dfs_path)) {
1919
daos_fs = file_system;
2020
daos_file.file = obj;
2121
dfs_get_size(daos_fs, obj, &file_size);
2222
size_t num_of_buffers;
2323
size_t buff_size;
2424

25-
if(char* env_num_of_buffers = std::getenv("TF_IO_DAOS_NUM_OF_BUFFERS")) {
25+
if (char* env_num_of_buffers = std::getenv("TF_IO_DAOS_NUM_OF_BUFFERS")) {
2626
num_of_buffers = atoi(env_num_of_buffers);
27-
}
28-
else {
27+
} else {
2928
num_of_buffers = NUM_OF_BUFFERS;
3029
}
3130

32-
if(char* env_buff_size = std::getenv("TF_IO_DAOS_BUFFER_SIZE")) {
31+
if (char* env_buff_size = std::getenv("TF_IO_DAOS_BUFFER_SIZE")) {
3332
buff_size = GetStorageSize(env_buff_size);
34-
}
35-
else {
33+
} else {
3634
buff_size = BUFF_SIZE;
3735
}
3836
std::cout << buff_size / 1024 / 1024 << std::endl;
3937
std::cout << num_of_buffers << std::endl;
40-
for(size_t i = 0; i < num_of_buffers; i++) {
38+
for (size_t i = 0; i < num_of_buffers; i++) {
4139
buffers.push_back(ReadBuffer(i, eqh, buff_size));
4240
}
4341
}
4442
} DFSRandomAccessFile;
4543

4644
void Cleanup(TF_RandomAccessFile* file) {
4745
auto dfs_file = static_cast<DFSRandomAccessFile*>(file->plugin_file);
48-
for(auto& read_buf: dfs_file->buffers) {
46+
for (auto& read_buf : dfs_file->buffers) {
4947
read_buf.AbortEvent();
5048
}
5149
dfs_release(dfs_file->daos_file.file);
@@ -56,21 +54,25 @@ void Cleanup(TF_RandomAccessFile* file) {
5654
int64_t Read(const TF_RandomAccessFile* file, uint64_t offset, size_t n,
5755
char* ret, TF_Status* status) {
5856
auto dfs_file = static_cast<DFSRandomAccessFile*>(file->plugin_file);
59-
for(auto& read_buf: dfs_file->buffers) {
60-
if(read_buf.CacheHit(offset, n))
61-
return read_buf.CopyFromCache(ret, offset, n, dfs_file->file_size, status);
57+
for (auto& read_buf : dfs_file->buffers) {
58+
if (read_buf.CacheHit(offset, n))
59+
return read_buf.CopyFromCache(ret, offset, n, dfs_file->file_size,
60+
status);
6261
}
6362

6463
size_t curr_offset = offset + BUFF_SIZE;
65-
for(size_t i = 1; i < dfs_file->buffers.size(); i++) {
66-
if(curr_offset > dfs_file->file_size) break;
67-
dfs_file->buffers[i].ReadAsync(dfs_file->daos_fs, dfs_file->daos_file.file, curr_offset);
64+
for (size_t i = 1; i < dfs_file->buffers.size(); i++) {
65+
if (curr_offset > dfs_file->file_size) break;
66+
dfs_file->buffers[i].ReadAsync(dfs_file->daos_fs, dfs_file->daos_file.file,
67+
curr_offset);
6868
curr_offset += BUFF_SIZE;
6969
}
7070

71-
dfs_file->buffers[0].ReadSync(dfs_file->daos_fs, dfs_file->daos_file.file, offset);
71+
dfs_file->buffers[0].ReadSync(dfs_file->daos_fs, dfs_file->daos_file.file,
72+
offset);
7273

73-
return dfs_file->buffers[0].CopyFromCache(ret, offset, n, dfs_file->file_size, status);
74+
return dfs_file->buffers[0].CopyFromCache(ret, offset, n, dfs_file->file_size,
75+
status);
7476
}
7577

7678
} // namespace tf_random_access_file
@@ -204,9 +206,10 @@ void NewRandomAccessFile(const TF_Filesystem* filesystem, const char* path,
204206
TF_SetStatus(status, TF_INTERNAL, "Error initializng DAOS API");
205207
return;
206208
}
207-
auto random_access_file = new tf_random_access_file::DFSRandomAccessFile(path, daos->daos_fs, daos->mEventQueueHandle,
208-
obj);
209-
random_access_file->buffers[0].ReadAsync(daos->daos_fs, random_access_file->daos_file.file, 0);
209+
auto random_access_file = new tf_random_access_file::DFSRandomAccessFile(
210+
path, daos->daos_fs, daos->mEventQueueHandle, obj);
211+
random_access_file->buffers[0].ReadAsync(
212+
daos->daos_fs, random_access_file->daos_file.file, 0);
210213
file->plugin_file = random_access_file;
211214
TF_SetStatus(status, TF_OK, "");
212215
}
@@ -237,7 +240,7 @@ void PathExists(const TF_Filesystem* filesystem, const char* path,
237240
}
238241
std::string pool, cont, file;
239242
rc = daos->Setup(path, pool, cont, file, status);
240-
if(rc) return;
243+
if (rc) return;
241244
dfs_obj_t* obj;
242245
rc = daos->dfsPathExists(file, &obj);
243246
if (rc) {

tensorflow_io/core/filesystems/dfs/dfs_utils.cc

Lines changed: 76 additions & 87 deletions
Original file line numberDiff line numberDiff line change
@@ -17,31 +17,30 @@ std::string GetStorageString(uint64_t size) {
1717
size_t GetStorageSize(std::string size) {
1818
char size_char = size.back();
1919
size_t curr_scale = 1;
20-
switch (size_char)
21-
{
22-
case 'K':
23-
size.pop_back();
24-
curr_scale *= 1024;
25-
return (size_t) atoi(size.c_str()) * curr_scale;
26-
break;
27-
case 'M':
28-
size.pop_back();
29-
curr_scale *= 1024 * 1024;
30-
return (size_t) atoi(size.c_str()) * curr_scale;
31-
break;
32-
case 'G':
33-
size.pop_back();
34-
curr_scale *= 1024 * 1024 * 1024;
35-
return (size_t) atoi(size.c_str()) * curr_scale;
36-
break;
37-
case 'T':
38-
size.pop_back();
39-
curr_scale *= 1024 * 1024 * 1024;
40-
return (size_t) atoi(size.c_str()) * curr_scale * 1024;
41-
break;
42-
default:
43-
return atoi(size.c_str());
44-
break;
20+
switch (size_char) {
21+
case 'K':
22+
size.pop_back();
23+
curr_scale *= 1024;
24+
return (size_t)atoi(size.c_str()) * curr_scale;
25+
break;
26+
case 'M':
27+
size.pop_back();
28+
curr_scale *= 1024 * 1024;
29+
return (size_t)atoi(size.c_str()) * curr_scale;
30+
break;
31+
case 'G':
32+
size.pop_back();
33+
curr_scale *= 1024 * 1024 * 1024;
34+
return (size_t)atoi(size.c_str()) * curr_scale;
35+
break;
36+
case 'T':
37+
size.pop_back();
38+
curr_scale *= 1024 * 1024 * 1024;
39+
return (size_t)atoi(size.c_str()) * curr_scale * 1024;
40+
break;
41+
default:
42+
return atoi(size.c_str());
43+
break;
4544
}
4645
}
4746

@@ -61,8 +60,7 @@ int ParseDFSPath(const std::string& path, std::string& pool_string,
6160
attr->da_no_prefix = false;
6261
direct_path = "daos://" + path.substr(pool_start);
6362
rc = duns_resolve_path(direct_path.c_str(), attr);
64-
if(rc)
65-
return rc;
63+
if (rc) return rc;
6664
}
6765
pool_string = attr->da_pool;
6866
cont_string = attr->da_cont;
@@ -97,9 +95,7 @@ DFS::DFS() {
9795
is_initialized = false;
9896
}
9997

100-
DFS::~DFS() {
101-
free(daos_fs);
102-
}
98+
DFS::~DFS() { free(daos_fs); }
10399

104100
DFS* DFS::Load() {
105101
if (!is_initialized) {
@@ -112,10 +108,10 @@ DFS* DFS::Load() {
112108
return this;
113109
}
114110

115-
int DFS::dfsInit() {
111+
int DFS::dfsInit() {
116112
int rc = daos_init();
117-
if(rc) return rc;
118-
return daos_eq_create(&mEventQueueHandle);
113+
if (rc) return rc;
114+
return daos_eq_create(&mEventQueueHandle);
119115
}
120116

121117
void DFS::dfsCleanup() {
@@ -149,7 +145,7 @@ void DFS::Teardown() {
149145
int ret;
150146
do {
151147
ret = daos_eq_poll(mEventQueueHandle, 1, -1, 1, &(temp_event));
152-
} while(ret == 1);
148+
} while (ret == 1);
153149
daos_eq_destroy(mEventQueueHandle, 0);
154150
Unmount();
155151
ClearConnections();
@@ -462,22 +458,23 @@ int DFS::DisconnectContainer(std::string pool_string, std::string cont_string) {
462458
return rc;
463459
}
464460

465-
ReadBuffer::ReadBuffer(size_t id, daos_handle_t eqh, size_t size): id(id), buffer_size(size), eqh(eqh) {
466-
buffer = new char[size];
467-
buffer_offset = 0;
468-
event = new daos_event_t;
469-
daos_event_init(event, eqh,nullptr);
470-
valid = false;
461+
ReadBuffer::ReadBuffer(size_t id, daos_handle_t eqh, size_t size)
462+
: id(id), buffer_size(size), eqh(eqh) {
463+
buffer = new char[size];
464+
buffer_offset = 0;
465+
event = new daos_event_t;
466+
daos_event_init(event, eqh, nullptr);
467+
valid = false;
471468
}
472469

473470
ReadBuffer::~ReadBuffer() {
474-
if(event != nullptr) {
475-
bool event_status;
476-
daos_event_test(event, 0, &event_status);
477-
daos_event_fini(event);
478-
}
479-
delete [] buffer;
480-
delete event;
471+
if (event != nullptr) {
472+
bool event_status;
473+
daos_event_test(event, 0, &event_status);
474+
daos_event_fini(event);
475+
}
476+
delete[] buffer;
477+
delete event;
481478
}
482479

483480
ReadBuffer::ReadBuffer(ReadBuffer&& read_buffer) {
@@ -492,86 +489,78 @@ ReadBuffer::ReadBuffer(ReadBuffer&& read_buffer) {
492489
read_buffer.event = nullptr;
493490
}
494491

495-
bool
496-
ReadBuffer::CacheHit(const size_t pos, const size_t len) {
497-
return pos >= buffer_offset && len <= buffer_size && (pos+len <= buffer_offset + buffer_size);
492+
bool ReadBuffer::CacheHit(const size_t pos, const size_t len) {
493+
return pos >= buffer_offset && len <= buffer_size &&
494+
(pos + len <= buffer_offset + buffer_size);
498495
}
499496

500-
int
501-
ReadBuffer::WaitEvent() {
502-
if(valid) return 0;
497+
int ReadBuffer::WaitEvent() {
498+
if (valid) return 0;
503499
bool event_status;
504500
daos_event_test(event, -1, &event_status);
505-
if(event_status) {
501+
if (event_status) {
506502
valid = true;
507503
return 0;
508504
}
509505
return -1;
510506
}
511507

512-
int
513-
ReadBuffer::AbortEvent() {
508+
int ReadBuffer::AbortEvent() {
514509
bool event_status = false;
515510
daos_event_test(event, 0, &event_status);
516-
if(!event_status)
511+
if (!event_status)
517512
return daos_event_abort(event);
518513
else
519514
return 0;
520515
}
521516

522-
int
523-
ReadBuffer::ReadAsync(dfs_t* daos_fs, dfs_obj_t* file, const size_t off) {
517+
int ReadBuffer::ReadAsync(dfs_t* daos_fs, dfs_obj_t* file, const size_t off) {
524518
int rc = AbortEvent();
525-
if(rc) return rc;
519+
if (rc) return rc;
526520
d_iov_set(&iov, (void*)buffer, buffer_size);
527521
rsgl.sg_nr = 1;
528522
rsgl.sg_iovs = &iov;
529523
valid = false;
530524
buffer_offset = off;
531-
dfs_read(daos_fs, file, &rsgl,
532-
buffer_offset, &read_size, event);
525+
dfs_read(daos_fs, file, &rsgl, buffer_offset, &read_size, event);
533526
return 0;
534527
}
535528

536-
int
537-
ReadBuffer::ReadSync(dfs_t* daos_fs, dfs_obj_t* file, const size_t off) {
529+
int ReadBuffer::ReadSync(dfs_t* daos_fs, dfs_obj_t* file, const size_t off) {
538530
int rc = AbortEvent();
539-
if(rc) return rc;
531+
if (rc) return rc;
540532
d_iov_set(&iov, (void*)buffer, buffer_size);
541533
rsgl.sg_nr = 1;
542534
rsgl.sg_iovs = &iov;
543535
valid = false;
544536
buffer_offset = off;
545-
rc = dfs_read(daos_fs, file, &rsgl,
546-
off, &read_size, NULL);
537+
rc = dfs_read(daos_fs, file, &rsgl, off, &read_size, NULL);
547538
valid = true;
548539
return rc;
549540
}
550541

551-
int
552-
ReadBuffer::CopyData(char* ret, const size_t off, const size_t n) {
542+
int ReadBuffer::CopyData(char* ret, const size_t off, const size_t n) {
553543
int rc = WaitEvent();
554-
if(rc) return rc;
555-
memcpy(ret, buffer + (off - buffer_offset), n);
544+
if (rc) return rc;
545+
memcpy(ret, buffer + (off - buffer_offset), n);
556546
return 0;
557547
}
558548

559-
int
560-
ReadBuffer::CopyFromCache(char* ret, const size_t off, const size_t n, const daos_size_t file_size, TF_Status* status){
561-
size_t read_size;
562-
read_size = off + n > file_size? file_size - off : n;
563-
int rc = CopyData(ret, off, read_size);
564-
if (rc) {
565-
TF_SetStatus(status, TF_INTERNAL, "");
566-
return 0;
567-
}
568-
569-
if (off + n > file_size) {
570-
TF_SetStatus(status, TF_OUT_OF_RANGE, "");
571-
return read_size;
572-
}
549+
int ReadBuffer::CopyFromCache(char* ret, const size_t off, const size_t n,
550+
const daos_size_t file_size, TF_Status* status) {
551+
size_t read_size;
552+
read_size = off + n > file_size ? file_size - off : n;
553+
int rc = CopyData(ret, off, read_size);
554+
if (rc) {
555+
TF_SetStatus(status, TF_INTERNAL, "");
556+
return 0;
557+
}
573558

574-
TF_SetStatus(status, TF_OK, "");
559+
if (off + n > file_size) {
560+
TF_SetStatus(status, TF_OUT_OF_RANGE, "");
575561
return read_size;
576-
}
562+
}
577563

564+
TF_SetStatus(status, TF_OK, "");
565+
return read_size;
566+
}

0 commit comments

Comments
 (0)