@@ -22,25 +22,33 @@ size_t GetStorageSize(std::string size) {
2222 size.pop_back ();
2323 curr_scale *= 1024 ;
2424 return (size_t )atoi (size.c_str ()) * curr_scale;
25- break ;
2625 case ' M' :
2726 size.pop_back ();
2827 curr_scale *= 1024 * 1024 ;
2928 return (size_t )atoi (size.c_str ()) * curr_scale;
30- break ;
3129 case ' G' :
3230 size.pop_back ();
3331 curr_scale *= 1024 * 1024 * 1024 ;
3432 return (size_t )atoi (size.c_str ()) * curr_scale;
35- break ;
3633 case ' T' :
3734 size.pop_back ();
3835 curr_scale *= 1024 * 1024 * 1024 ;
3936 return (size_t )atoi (size.c_str ()) * curr_scale * 1024 ;
40- break ;
4137 default :
4238 return atoi (size.c_str ());
43- break ;
39+ }
40+ }
41+
42+ mode_t GetFlags (File_Mode mode) {
43+ switch (mode) {
44+ case READ:
45+ return O_RDONLY;
46+ case WRITE:
47+ return O_WRONLY | O_CREAT;
48+ case APPEND:
49+ return O_WRONLY | O_APPEND | O_CREAT;
50+ default :
51+ return -1 ;
4452 }
4553}
4654
@@ -74,8 +82,8 @@ int ParseUUID(const std::string& str, uuid_t uuid) {
7482}
7583
7684void CopyEntries (char *** entries, std::vector<std::string>& results) {
77- *entries = static_cast <char **>(tensorflow::io::plugin_memory_allocate (
78- results.size () * sizeof ((*entries)[ 0 ] )));
85+ *entries = static_cast <char **>(
86+ tensorflow::io::plugin_memory_allocate ( results.size () * sizeof (char * )));
7987
8088 for (uint32_t i = 0 ; i < results.size (); i++) {
8189 (*entries)[i] = static_cast <char *>(tensorflow::io::plugin_memory_allocate (
@@ -267,10 +275,47 @@ int DFS::ClearConnections() {
267275 return rc;
268276}
269277
270- void DFS::dfsNewFile (std::string& file_path, mode_t mode, int flags,
278+ int DFS::dfsDeleteObject (std::string& dir_path, bool is_dir, bool recursive,
279+ TF_Status* status) {
280+ dfs_obj_t * temp_obj;
281+ int rc = dfsPathExists (dir_path, &temp_obj, 0 );
282+ if (rc) {
283+ TF_SetStatus (status, TF_NOT_FOUND, " " );
284+ return -1 ;
285+ }
286+ if (!is_dir && S_ISDIR (temp_obj->mode )) {
287+ TF_SetStatus (status, TF_FAILED_PRECONDITION, " " );
288+ return -1 ;
289+ }
290+ dfs_release (temp_obj);
291+
292+ size_t dir_start = dir_path.rfind (" /" ) + 1 ;
293+ std::string dir = dir_path.substr (dir_start);
294+ dfs_obj_t * parent;
295+ rc = dfsFindParent (dir_path, &parent);
296+ if (rc) {
297+ TF_SetStatus (status, TF_NOT_FOUND, " " );
298+ return -1 ;
299+ }
300+
301+ rc = dfs_remove (daos_fs, parent, dir.c_str (), recursive, NULL );
302+
303+ dfs_release (parent);
304+
305+ if (rc) {
306+ TF_SetStatus (status, TF_INTERNAL, " Error Deleting Existing Object" );
307+ } else {
308+ TF_SetStatus (status, TF_OK, " " );
309+ }
310+
311+ return rc;
312+ }
313+
314+ void DFS::dfsNewFile (std::string& file_path, File_Mode file_mode, int flags,
271315 dfs_obj_t ** obj, TF_Status* status) {
272316 int rc;
273317 dfs_obj_t * temp_obj;
318+ mode_t open_flags;
274319 rc = dfsPathExists (file_path, &temp_obj, 0 );
275320 if (rc && flags == O_RDONLY) {
276321 TF_SetStatus (status, TF_NOT_FOUND, " " );
@@ -287,6 +332,13 @@ void DFS::dfsNewFile(std::string& file_path, mode_t mode, int flags,
287332 dfs_release (temp_obj);
288333 }
289334
335+ if (!rc && file_mode == WRITE) {
336+ rc = dfsDeleteObject (file_path, false , false , status);
337+ if (rc) return ;
338+ }
339+
340+ open_flags = GetFlags (file_mode);
341+
290342 dfs_obj_t * parent;
291343 rc = dfsFindParent (file_path, &parent);
292344 if (rc) {
@@ -302,8 +354,8 @@ void DFS::dfsNewFile(std::string& file_path, mode_t mode, int flags,
302354 size_t file_start = file_path.rfind (" /" ) + 1 ;
303355 std::string file_name = file_path.substr (file_start);
304356
305- rc = dfs_open (daos_fs, parent, file_name.c_str (), mode, flags, 0 , 0 , NULL ,
306- obj);
357+ rc = dfs_open (daos_fs, parent, file_name.c_str (), flags, open_flags , 0 , 0 ,
358+ NULL , obj);
307359 if (rc) {
308360 TF_SetStatus (status, TF_INTERNAL, " Error Creating Writable File" );
309361 return ;
@@ -372,14 +424,17 @@ int DFS::dfsReadDir(dfs_obj_t* obj, std::vector<std::string>& children) {
372424 daos_anchor_t anchor = {0 };
373425 uint32_t nr = STACK;
374426 struct dirent * dirs = (struct dirent *)malloc (nr * sizeof (struct dirent ));
427+ while (!daos_anchor_is_eof (&anchor)) {
428+ rc = dfs_readdir (daos_fs, obj, &anchor, &nr, dirs);
429+ if (rc) {
430+ return rc;
431+ }
375432
376- rc = dfs_readdir (daos_fs, obj, &anchor, &nr, dirs);
377- if (rc) {
378- return rc;
379- }
380-
381- for (uint32_t i = 0 ; i < nr; i++) {
382- children.push_back (dirs[i].d_name );
433+ for (uint32_t i = 0 ; i < nr; i++) {
434+ // std::cout << "Before File " << dirs[i].d_name << std::endl;
435+ children.emplace_back (dirs[i].d_name );
436+ // std::cout << "After File " << dirs[i].d_name << std::endl;
437+ }
383438 }
384439
385440 free (dirs);
@@ -498,11 +553,11 @@ int ReadBuffer::WaitEvent() {
498553 if (valid) return 0 ;
499554 bool event_status;
500555 daos_event_test (event, -1 , &event_status);
501- if (event_status) {
556+ if (event_status && event-> ev_error == DER_SUCCESS ) {
502557 valid = true ;
503558 return 0 ;
504559 }
505- return - 1 ;
560+ return event-> ev_error ;
506561}
507562
508563int ReadBuffer::AbortEvent () {
@@ -535,7 +590,7 @@ int ReadBuffer::ReadSync(dfs_t* daos_fs, dfs_obj_t* file, const size_t off) {
535590 valid = false ;
536591 buffer_offset = off;
537592 rc = dfs_read (daos_fs, file, &rsgl, off, &read_size, NULL );
538- valid = true ;
593+ if (!rc) valid = true ;
539594 return rc;
540595}
541596
0 commit comments