@@ -22,28 +22,20 @@ import (
2222 "path"
2323 "strconv"
2424 "strings"
25- "sync"
2625 "time"
2726
2827 "github.com/pkg/errors"
29-
3028 "github.com/quickfixgo/quickfix"
3129 "github.com/quickfixgo/quickfix/config"
3230)
3331
34- type msgDef struct {
35- offset int64
36- size int
37- }
38-
3932type fileStoreFactory struct {
4033 settings * quickfix.Settings
4134}
4235
4336type fileStore struct {
4437 sessionID quickfix.SessionID
4538 cache quickfix.MessageStore
46- offsets sync.Map
4739 bodyFname string
4840 headerFname string
4941 sessionFname string
@@ -107,7 +99,6 @@ func newFileStore(sessionID quickfix.SessionID, dirname string, fileSync bool) (
10799 store := & fileStore {
108100 sessionID : sessionID ,
109101 cache : memStore ,
110- offsets : sync.Map {},
111102 bodyFname : path .Join (dirname , fmt .Sprintf ("%s.%s" , sessionPrefix , "body" )),
112103 headerFname : path .Join (dirname , fmt .Sprintf ("%s.%s" , sessionPrefix , "header" )),
113104 sessionFname : path .Join (dirname , fmt .Sprintf ("%s.%s" , sessionPrefix , "session" )),
@@ -199,18 +190,6 @@ func (store *fileStore) Refresh() (err error) {
199190}
200191
201192func (store * fileStore ) populateCache () (creationTimePopulated bool , err error ) {
202- if tmpHeaderFile , err := os .Open (store .headerFname ); err == nil {
203- defer tmpHeaderFile .Close ()
204- for {
205- var seqNum , size int
206- var offset int64
207- if cnt , err := fmt .Fscanf (tmpHeaderFile , "%d,%d,%d\n " , & seqNum , & offset , & size ); err != nil || cnt != 3 {
208- break
209- }
210- store .offsets .Store (seqNum , msgDef {offset : offset , size : size })
211- }
212- }
213-
214193 if timeBytes , err := os .ReadFile (store .sessionFname ); err == nil {
215194 var ctime time.Time
216195 if err := ctime .UnmarshalText (timeBytes ); err == nil {
@@ -348,7 +327,6 @@ func (store *fileStore) SaveMessage(seqNum int, msg []byte) error {
348327 }
349328 }
350329
351- store .offsets .Store (seqNum , msgDef {offset : offset , size : len (msg )})
352330 return nil
353331}
354332
@@ -360,34 +338,38 @@ func (store *fileStore) SaveMessageAndIncrNextSenderMsgSeqNum(seqNum int, msg []
360338 return store .IncrNextSenderMsgSeqNum ()
361339}
362340
363- func (store * fileStore ) getMessage (seqNum int ) (msg []byte , found bool , err error ) {
364- msgInfoTemp , found := store .offsets .Load (seqNum )
365- if ! found {
366- return
367- }
368- msgInfo , ok := msgInfoTemp .(msgDef )
369- if ! ok {
370- return nil , true , fmt .Errorf ("incorrect msgInfo type while reading file: %s" , store .bodyFname )
371- }
372-
373- msg = make ([]byte , msgInfo .size )
374- if _ , err = store .bodyFile .ReadAt (msg , msgInfo .offset ); err != nil {
375- return nil , true , fmt .Errorf ("unable to read from file: %s: %s" , store .bodyFname , err .Error ())
376- }
377-
378- return msg , true , nil
379- }
380-
381341func (store * fileStore ) IterateMessages (beginSeqNum , endSeqNum int , cb func ([]byte ) error ) error {
382- for seqNum := beginSeqNum ; seqNum <= endSeqNum ; seqNum ++ {
383- m , found , err := store .getMessage (seqNum )
384- if err != nil {
385- return err
386- }
387- if found {
388- if err = cb (m ); err != nil {
389- return err
342+ // Sync files and seek to start of header file
343+ if err := store .bodyFile .Sync (); err != nil {
344+ return fmt .Errorf ("unable to flush file: %s: %s" , store .bodyFname , err .Error ())
345+ } else if err = store .headerFile .Sync (); err != nil {
346+ return fmt .Errorf ("unable to flush file: %s: %s" , store .headerFname , err .Error ())
347+ } else if _ , err = store .headerFile .Seek (0 , io .SeekStart ); err != nil {
348+ return fmt .Errorf ("unable to seek to start of file: %s: %s" , store .headerFname , err .Error ())
349+ }
350+
351+ // Iterate over the header file
352+ for {
353+ var seqNum , size int
354+ var offset int64
355+ if cnt , err := fmt .Fscanf (store .headerFile , "%d,%d,%d\n " , & seqNum , & offset , & size ); err != nil {
356+ if errors .Is (err , io .EOF ) {
357+ break
390358 }
359+ return fmt .Errorf ("unable to read from file: %s: %s" , store .headerFname , err .Error ())
360+ } else if cnt < 3 || seqNum > endSeqNum {
361+ // If we have reached the end of possible iteration then break
362+ break
363+ } else if seqNum < beginSeqNum {
364+ // If we have not yet reached the starting sequence number then continue
365+ continue
366+ }
367+ // Otherwise process the file
368+ msg := make ([]byte , size )
369+ if _ , err := store .bodyFile .ReadAt (msg , offset ); err != nil {
370+ return fmt .Errorf ("unable to read from file: %s: %s" , store .bodyFname , err .Error ())
371+ } else if err = cb (msg ); err != nil {
372+ return err
391373 }
392374 }
393375 return nil
0 commit comments