@@ -13,14 +13,15 @@ typedef struct DFSRandomAccessFile {
1313 DAOS_FILE daos_file;
1414 std::vector<ReadBuffer> buffers;
1515 daos_size_t file_size;
16- DFSRandomAccessFile (std::string dfs_path, dfs_t * file_system,
17- daos_handle_t eqh , dfs_obj_t * obj)
16+ daos_handle_t mEventQueueHandle {};
17+ DFSRandomAccessFile (std::string dfs_path, dfs_t * file_system , dfs_obj_t * obj)
1818 : dfs_path(std::move(dfs_path)) {
1919 daos_fs = file_system;
2020 daos_file.file = obj;
2121 dfs_get_size (daos_fs, obj, &file_size);
2222 size_t num_of_buffers;
2323 size_t buff_size;
24+ int rc = daos_eq_create (&mEventQueueHandle );
2425
2526 if (char * env_num_of_buffers = std::getenv (" TF_IO_DAOS_NUM_OF_BUFFERS" )) {
2627 num_of_buffers = atoi (env_num_of_buffers);
@@ -34,16 +35,18 @@ typedef struct DFSRandomAccessFile {
3435 buff_size = BUFF_SIZE;
3536 }
3637 for (size_t i = 0 ; i < num_of_buffers; i++) {
37- buffers.push_back (ReadBuffer (i, eqh , buff_size));
38+ buffers.push_back (ReadBuffer (i, mEventQueueHandle , buff_size));
3839 }
3940 }
4041} DFSRandomAccessFile;
4142
4243void Cleanup (TF_RandomAccessFile* file) {
4344 auto dfs_file = static_cast <DFSRandomAccessFile*>(file->plugin_file );
44- for (auto & read_buf : dfs_file->buffers ) {
45- read_buf. AbortEvent ();
45+ for (auto & buffer : dfs_file->buffers ) {
46+ buffer. FinalizeEvent ();
4647 }
48+
49+ daos_eq_destroy (dfs_file->mEventQueueHandle , 0 );
4750 dfs_release (dfs_file->daos_file .file );
4851 dfs_file->daos_fs = nullptr ;
4952 delete dfs_file;
@@ -52,7 +55,11 @@ void Cleanup(TF_RandomAccessFile* file) {
5255int64_t Read (const TF_RandomAccessFile* file, uint64_t offset, size_t n,
5356 char * ret, TF_Status* status) {
5457 auto dfs_file = static_cast <DFSRandomAccessFile*>(file->plugin_file );
55- if (offset > dfs_file->file_size ) return -1 ;
58+ if (offset > dfs_file->file_size ) {
59+ TF_SetStatus (status, TF_OUT_OF_RANGE, " " );
60+ return -1 ;
61+ }
62+
5663 size_t ret_offset = 0 ;
5764 size_t curr_offset = offset;
5865 int64_t total_bytes = 0 ;
@@ -61,7 +68,7 @@ int64_t Read(const TF_RandomAccessFile* file, uint64_t offset, size_t n,
6168 size_t read_bytes = 0 ;
6269 for (auto & read_buf : dfs_file->buffers ) {
6370 if (read_buf.CacheHit (curr_offset)) {
64- read_bytes = read_buf.CopyFromCache (ret, ret_offset, offset , n,
71+ read_bytes = read_buf.CopyFromCache (ret, ret_offset, curr_offset , n,
6572 dfs_file->file_size , status);
6673 }
6774 }
@@ -92,6 +99,12 @@ int64_t Read(const TF_RandomAccessFile* file, uint64_t offset, size_t n,
9299 ret_offset += read_bytes;
93100 total_bytes += read_bytes;
94101 n -= read_bytes;
102+
103+ if (curr_offset >= dfs_file->file_size ) {
104+ for (size_t i = 0 ; i < dfs_file->buffers .size (); i++) {
105+ dfs_file->buffers [i].WaitEvent ();
106+ }
107+ }
95108 }
96109
97110 return total_bytes;
@@ -227,8 +240,8 @@ void NewRandomAccessFile(const TF_Filesystem* filesystem, const char* path,
227240 TF_SetStatus (status, TF_INTERNAL, " Error initializng DAOS API" );
228241 return ;
229242 }
230- auto random_access_file = new tf_random_access_file::DFSRandomAccessFile (
231- path, daos->daos_fs , daos-> mEventQueueHandle , obj);
243+ auto random_access_file =
244+ new tf_random_access_file::DFSRandomAccessFile ( path, daos->daos_fs , obj);
232245 random_access_file->buffers [0 ].ReadAsync (
233246 daos->daos_fs , random_access_file->daos_file .file , 0 );
234247 file->plugin_file = random_access_file;
0 commit comments