Skip to content

Commit ad74d03

Browse files
committed
Introduce message iterator to avoid loading all messages into memory at once upon resend request
1 parent e3a2994 commit ad74d03

File tree

7 files changed

+104
-56
lines changed

7 files changed

+104
-56
lines changed

in_session.go

Lines changed: 13 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -225,38 +225,31 @@ func (state inSession) handleResendRequest(session *session, msg *Message) (next
225225
return state
226226
}
227227

228-
func (state inSession) resendMessages(session *session, beginSeqNo, endSeqNo int, inReplyTo Message) (err error) {
228+
func (state inSession) resendMessages(session *session, beginSeqNo, endSeqNo int, inReplyTo Message) error {
229229
if session.DisableMessagePersist {
230-
err = state.generateSequenceReset(session, beginSeqNo, endSeqNo+1, inReplyTo)
231-
return
232-
}
233-
234-
msgs, err := session.store.GetMessages(beginSeqNo, endSeqNo)
235-
if err != nil {
236-
session.log.OnEventf("error retrieving messages from store: %s", err.Error())
237-
return
230+
return state.generateSequenceReset(session, beginSeqNo, endSeqNo+1, inReplyTo)
238231
}
239232

240233
seqNum := beginSeqNo
241234
nextSeqNum := seqNum
242235
msg := NewMessage()
243-
for _, msgBytes := range msgs {
244-
err = ParseMessageWithDataDictionary(msg, bytes.NewBuffer(msgBytes), session.transportDataDictionary, session.appDataDictionary)
236+
err := session.store.IterateMessages(beginSeqNo, endSeqNo, func(msgBytes []byte) error {
237+
err := ParseMessageWithDataDictionary(msg, bytes.NewBuffer(msgBytes), session.transportDataDictionary, session.appDataDictionary)
245238
if err != nil {
246239
session.log.OnEventf("Resend Msg Parse Error: %v, %v", err.Error(), bytes.NewBuffer(msgBytes).String())
247-
return // We cant continue with a message that cant be parsed correctly.
240+
return err // We cant continue with a message that cant be parsed correctly.
248241
}
249242
msgType, _ := msg.Header.GetBytes(tagMsgType)
250243
sentMessageSeqNum, _ := msg.Header.GetInt(tagMsgSeqNum)
251244

252245
if isAdminMessageType(msgType) {
253246
nextSeqNum = sentMessageSeqNum + 1
254-
continue
247+
return nil
255248
}
256249

257250
if !session.resend(msg) {
258251
nextSeqNum = sentMessageSeqNum + 1
259-
continue
252+
return nil
260253
}
261254

262255
if seqNum != sentMessageSeqNum {
@@ -271,6 +264,11 @@ func (state inSession) resendMessages(session *session, beginSeqNo, endSeqNo int
271264

272265
seqNum = sentMessageSeqNum + 1
273266
nextSeqNum = seqNum
267+
return nil
268+
})
269+
if err != nil {
270+
session.log.OnEventf("error retrieving messages from store: %s", err.Error())
271+
return err
274272
}
275273

276274
if seqNum != nextSeqNum { // gapfill for catch-up
@@ -279,7 +277,7 @@ func (state inSession) resendMessages(session *session, beginSeqNo, endSeqNo int
279277
}
280278
}
281279

282-
return
280+
return nil
283281
}
284282

285283
func (state inSession) processReject(session *session, msg *Message, rej MessageRejectError) sessionState {

internal/testsuite/store_suite.go

Lines changed: 28 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,28 @@ func (s *StoreTestSuite) TestMessageStoreReset() {
7777
s.Equal(1, s.MsgStore.NextTargetMsgSeqNum())
7878
}
7979

80+
func (s *StoreTestSuite) fetchMessages(beginSeqNum, endSeqNum int) (msgs [][]byte) {
81+
s.T().Helper()
82+
83+
// Fetch messages from the new iterator
84+
err := s.MsgStore.IterateMessages(beginSeqNum, endSeqNum, func(msg []byte) error {
85+
msgs = append(msgs, msg)
86+
return nil
87+
})
88+
s.Require().Nil(err)
89+
90+
// Fetch messages from the old getter
91+
oldMsgs, err := s.MsgStore.GetMessages(beginSeqNum, endSeqNum)
92+
s.Require().Nil(err)
93+
94+
// Ensure the output is the same
95+
s.Require().Len(msgs, len(oldMsgs))
96+
for idx, msg := range msgs {
97+
s.Require().EqualValues(msg, oldMsgs[idx])
98+
}
99+
return
100+
}
101+
80102
func (s *StoreTestSuite) TestMessageStoreSaveMessageGetMessage() {
81103
// Given the following saved messages
82104
expectedMsgsBySeqNum := map[int]string{
@@ -89,8 +111,7 @@ func (s *StoreTestSuite) TestMessageStoreSaveMessageGetMessage() {
89111
}
90112

91113
// When the messages are retrieved from the MessageStore
92-
actualMsgs, err := s.MsgStore.GetMessages(1, 3)
93-
s.Require().Nil(err)
114+
actualMsgs := s.fetchMessages(1, 3)
94115

95116
// Then the messages should be
96117
s.Require().Len(actualMsgs, 3)
@@ -102,8 +123,7 @@ func (s *StoreTestSuite) TestMessageStoreSaveMessageGetMessage() {
102123
s.Require().Nil(s.MsgStore.Refresh())
103124

104125
// And the messages are retrieved from the MessageStore
105-
actualMsgs, err = s.MsgStore.GetMessages(1, 3)
106-
s.Require().Nil(err)
126+
actualMsgs = s.fetchMessages(1, 3)
107127

108128
// Then the messages should still be
109129
s.Require().Len(actualMsgs, 3)
@@ -127,8 +147,7 @@ func (s *StoreTestSuite) TestMessageStoreSaveMessageAndIncrementGetMessage() {
127147
s.Equal(423, s.MsgStore.NextSenderMsgSeqNum())
128148

129149
// When the messages are retrieved from the MessageStore
130-
actualMsgs, err := s.MsgStore.GetMessages(1, 3)
131-
s.Require().Nil(err)
150+
actualMsgs := s.fetchMessages(1, 3)
132151

133152
// Then the messages should be
134153
s.Require().Len(actualMsgs, 3)
@@ -140,8 +159,7 @@ func (s *StoreTestSuite) TestMessageStoreSaveMessageAndIncrementGetMessage() {
140159
s.Require().Nil(s.MsgStore.Refresh())
141160

142161
// And the messages are retrieved from the MessageStore
143-
actualMsgs, err = s.MsgStore.GetMessages(1, 3)
144-
s.Require().Nil(err)
162+
actualMsgs = s.fetchMessages(1, 3)
145163

146164
s.Equal(423, s.MsgStore.NextSenderMsgSeqNum())
147165

@@ -154,8 +172,7 @@ func (s *StoreTestSuite) TestMessageStoreSaveMessageAndIncrementGetMessage() {
154172

155173
func (s *StoreTestSuite) TestMessageStoreGetMessagesEmptyStore() {
156174
// When messages are retrieved from an empty store
157-
messages, err := s.MsgStore.GetMessages(1, 2)
158-
require.Nil(s.T(), err)
175+
messages := s.fetchMessages(1, 2)
159176

160177
// Then no messages should be returned
161178
require.Empty(s.T(), messages, "Did not expect messages from empty store")
@@ -187,8 +204,7 @@ func (s *StoreTestSuite) TestMessageStoreGetMessagesVariousRanges() {
187204

188205
// Then the returned messages should be
189206
for _, tc := range testCases {
190-
actualMsgs, err := s.MsgStore.GetMessages(tc.beginSeqNo, tc.endSeqNo)
191-
require.Nil(t, err)
207+
actualMsgs := s.fetchMessages(tc.beginSeqNo, tc.endSeqNo)
192208
require.Len(t, actualMsgs, len(tc.expectedBytes))
193209
for i, expectedMsg := range tc.expectedBytes {
194210
assert.Equal(t, string(expectedMsg), string(actualMsgs[i]))

memorystore.go

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -97,14 +97,24 @@ func (store *memoryStore) SaveMessageAndIncrNextSenderMsgSeqNum(seqNum int, msg
9797
return store.IncrNextSenderMsgSeqNum()
9898
}
9999

100-
func (store *memoryStore) GetMessages(beginSeqNum, endSeqNum int) ([][]byte, error) {
101-
var msgs [][]byte
100+
func (store *memoryStore) IterateMessages(beginSeqNum, endSeqNum int, cb func([]byte) error) error {
102101
for seqNum := beginSeqNum; seqNum <= endSeqNum; seqNum++ {
103102
if m, ok := store.messageMap[seqNum]; ok {
104-
msgs = append(msgs, m)
103+
if err := cb(m); err != nil {
104+
return err
105+
}
105106
}
106107
}
107-
return msgs, nil
108+
return nil
109+
}
110+
111+
func (store *memoryStore) GetMessages(beginSeqNum, endSeqNum int) ([][]byte, error) {
112+
var msgs [][]byte
113+
err := store.IterateMessages(beginSeqNum, endSeqNum, func(m []byte) error {
114+
msgs = append(msgs, m)
115+
return nil
116+
})
117+
return msgs, err
108118
}
109119

110120
type memoryStoreFactory struct{}

store.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ type MessageStore interface {
3636
SaveMessage(seqNum int, msg []byte) error
3737
SaveMessageAndIncrNextSenderMsgSeqNum(seqNum int, msg []byte) error
3838
GetMessages(beginSeqNum, endSeqNum int) ([][]byte, error)
39+
IterateMessages(beginSeqNum, endSeqNum int, cb func([]byte) error) error
3940

4041
Refresh() error
4142
Reset() error

store/file/filestore.go

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -378,18 +378,28 @@ func (store *fileStore) getMessage(seqNum int) (msg []byte, found bool, err erro
378378
return msg, true, nil
379379
}
380380

381-
func (store *fileStore) GetMessages(beginSeqNum, endSeqNum int) ([][]byte, error) {
382-
var msgs [][]byte
381+
func (store *fileStore) IterateMessages(beginSeqNum, endSeqNum int, cb func([]byte) error) error {
383382
for seqNum := beginSeqNum; seqNum <= endSeqNum; seqNum++ {
384383
m, found, err := store.getMessage(seqNum)
385384
if err != nil {
386-
return nil, err
385+
return err
387386
}
388387
if found {
389-
msgs = append(msgs, m)
388+
if err = cb(m); err != nil {
389+
return err
390+
}
390391
}
391392
}
392-
return msgs, nil
393+
return nil
394+
}
395+
396+
func (store *fileStore) GetMessages(beginSeqNum, endSeqNum int) ([][]byte, error) {
397+
var msgs [][]byte
398+
err := store.IterateMessages(beginSeqNum, endSeqNum, func(msg []byte) error {
399+
msgs = append(msgs, msg)
400+
return nil
401+
})
402+
return msgs, err
393403
}
394404

395405
// Close closes the store's files.

store/mongo/mongostore.go

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -338,17 +338,17 @@ func (store *mongoStore) SaveMessageAndIncrNextSenderMsgSeqNum(seqNum int, msg [
338338
return store.cache.SetNextSenderMsgSeqNum(next)
339339
}
340340

341-
func (store *mongoStore) GetMessages(beginSeqNum, endSeqNum int) (msgs [][]byte, err error) {
341+
func (store *mongoStore) IterateMessages(beginSeqNum, endSeqNum int, cb func([]byte) error) error {
342342
msgFilter := generateMessageFilter(&store.sessionID)
343343
// Marshal into database form.
344344
msgFilterBytes, err := bson.Marshal(msgFilter)
345345
if err != nil {
346-
return
346+
return err
347347
}
348348
seqFilter := bson.M{}
349349
err = bson.Unmarshal(msgFilterBytes, &seqFilter)
350350
if err != nil {
351-
return
351+
return err
352352
}
353353
// Modify the query to use a range for the sequence filter.
354354
seqFilter["msgseq"] = bson.M{
@@ -358,18 +358,26 @@ func (store *mongoStore) GetMessages(beginSeqNum, endSeqNum int) (msgs [][]byte,
358358
sortOpt := options.Find().SetSort(bson.D{{Key: "msgseq", Value: 1}})
359359
cursor, err := store.db.Database(store.mongoDatabase).Collection(store.messagesCollection).Find(context.Background(), seqFilter, sortOpt)
360360
if err != nil {
361-
return
361+
return err
362362
}
363-
363+
defer func() { _ = cursor.Close(context.Background()) }()
364364
for cursor.Next(context.Background()) {
365365
if err = cursor.Decode(&msgFilter); err != nil {
366-
return
366+
return err
367+
} else if err = cb(msgFilter.Message); err != nil {
368+
return err
367369
}
368-
msgs = append(msgs, msgFilter.Message)
369370
}
371+
return nil
372+
}
370373

371-
err = cursor.Close(context.Background())
372-
return
374+
func (store *mongoStore) GetMessages(beginSeqNum, endSeqNum int) ([][]byte, error) {
375+
var msgs [][]byte
376+
err := store.IterateMessages(beginSeqNum, endSeqNum, func(msg []byte) error {
377+
msgs = append(msgs, msg)
378+
return nil
379+
})
380+
return msgs, err
373381
}
374382

375383
// Close closes the store's database connection.

store/sql/sqlstore.go

Lines changed: 16 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -352,9 +352,8 @@ func (store *sqlStore) SaveMessageAndIncrNextSenderMsgSeqNum(seqNum int, msg []b
352352
return store.cache.SetNextSenderMsgSeqNum(next)
353353
}
354354

355-
func (store *sqlStore) GetMessages(beginSeqNum, endSeqNum int) ([][]byte, error) {
355+
func (store *sqlStore) IterateMessages(beginSeqNum, endSeqNum int, cb func([]byte) error) error {
356356
s := store.sessionID
357-
var msgs [][]byte
358357
rows, err := store.db.Query(sqlString(`SELECT message FROM messages
359358
WHERE beginstring=? AND session_qualifier=?
360359
AND sendercompid=? AND sendersubid=? AND senderlocid=?
@@ -366,23 +365,29 @@ func (store *sqlStore) GetMessages(beginSeqNum, endSeqNum int) ([][]byte, error)
366365
s.TargetCompID, s.TargetSubID, s.TargetLocationID,
367366
beginSeqNum, endSeqNum)
368367
if err != nil {
369-
return nil, err
368+
return err
370369
}
371-
defer rows.Close()
370+
defer func() { _ = rows.Close() }()
372371

373372
for rows.Next() {
374373
var message string
375-
if err := rows.Scan(&message); err != nil {
376-
return nil, err
374+
if err = rows.Scan(&message); err != nil {
375+
return err
376+
} else if err = cb([]byte(message)); err != nil {
377+
return err
377378
}
378-
msgs = append(msgs, []byte(message))
379379
}
380380

381-
if err := rows.Err(); err != nil {
382-
return nil, err
383-
}
381+
return rows.Err()
382+
}
384383

385-
return msgs, nil
384+
func (store *sqlStore) GetMessages(beginSeqNum, endSeqNum int) ([][]byte, error) {
385+
var msgs [][]byte
386+
err := store.IterateMessages(beginSeqNum, endSeqNum, func(msg []byte) error {
387+
msgs = append(msgs, msg)
388+
return nil
389+
})
390+
return msgs, err
386391
}
387392

388393
// Close closes the store's database connection.

0 commit comments

Comments
 (0)