Skip to content

Commit ee9b2da

Browse files
korzhaAndrei Korzhevskiichrjohn
authored
[QFJ-968] Allow provision of custom Message Queue implementation (#380)
* [QFJ-968] Allow provision of custom Message Queue implementation Co-authored-by: Andrei Korzhevskii <andrei.korzhevskii@gs.com> Co-authored-by: Christoph John <christoph.john@macd.com>
1 parent 0b60ffe commit ee9b2da

File tree

10 files changed

+280
-64
lines changed

10 files changed

+280
-64
lines changed

quickfixj-core/src/main/java/quickfix/DefaultSessionFactory.java

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ public class DefaultSessionFactory implements SessionFactory {
4747

4848
private final Application application;
4949
private final MessageStoreFactory messageStoreFactory;
50+
private final MessageQueueFactory messageQueueFactory;
5051
private final LogFactory logFactory;
5152
private final MessageFactory messageFactory;
5253
private final SessionScheduleFactory sessionScheduleFactory;
@@ -55,6 +56,7 @@ public DefaultSessionFactory(Application application, MessageStoreFactory messag
5556
LogFactory logFactory) {
5657
this.application = application;
5758
this.messageStoreFactory = messageStoreFactory;
59+
this.messageQueueFactory = new InMemoryMessageQueueFactory();
5860
this.logFactory = logFactory;
5961
this.messageFactory = new DefaultMessageFactory();
6062
this.sessionScheduleFactory = new DefaultSessionScheduleFactory();
@@ -64,6 +66,7 @@ public DefaultSessionFactory(Application application, MessageStoreFactory messag
6466
LogFactory logFactory, MessageFactory messageFactory) {
6567
this.application = application;
6668
this.messageStoreFactory = messageStoreFactory;
69+
this.messageQueueFactory = new InMemoryMessageQueueFactory();
6770
this.logFactory = logFactory;
6871
this.messageFactory = messageFactory;
6972
this.sessionScheduleFactory = new DefaultSessionScheduleFactory();
@@ -74,6 +77,18 @@ public DefaultSessionFactory(Application application, MessageStoreFactory messag
7477
SessionScheduleFactory sessionScheduleFactory) {
7578
this.application = application;
7679
this.messageStoreFactory = messageStoreFactory;
80+
this.messageQueueFactory = new InMemoryMessageQueueFactory();
81+
this.logFactory = logFactory;
82+
this.messageFactory = messageFactory;
83+
this.sessionScheduleFactory = sessionScheduleFactory;
84+
}
85+
86+
public DefaultSessionFactory(Application application, MessageStoreFactory messageStoreFactory,
87+
MessageQueueFactory messageQueueFactory, LogFactory logFactory,
88+
MessageFactory messageFactory, SessionScheduleFactory sessionScheduleFactory) {
89+
this.application = application;
90+
this.messageStoreFactory = messageStoreFactory;
91+
this.messageQueueFactory = messageQueueFactory;
7792
this.logFactory = logFactory;
7893
this.messageFactory = messageFactory;
7994
this.sessionScheduleFactory = sessionScheduleFactory;
@@ -222,8 +237,8 @@ public Session create(SessionID sessionID, SessionSettings settings) throws Conf
222237

223238
final List<StringField> logonTags = getLogonTags(settings, sessionID);
224239

225-
final Session session = new Session(application, messageStoreFactory, sessionID,
226-
dataDictionaryProvider, sessionSchedule, logFactory,
240+
final Session session = new Session(application, messageStoreFactory, messageQueueFactory,
241+
sessionID, dataDictionaryProvider, sessionSchedule, logFactory,
227242
messageFactory, heartbeatInterval, checkLatency, maxLatency, timestampPrecision,
228243
resetOnLogon, resetOnLogout, resetOnDisconnect, refreshOnLogon, checkCompID,
229244
redundantResentRequestAllowed, persistMessages, useClosedIntervalForResend,
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
/*******************************************************************************
2+
* Copyright (c) quickfixengine.org All rights reserved.
3+
*
4+
* This file is part of the QuickFIX FIX Engine
5+
*
6+
* This file may be distributed under the terms of the quickfixengine.org
7+
* license as defined by quickfixengine.org and appearing in the file
8+
* LICENSE included in the packaging of this file.
9+
*
10+
* This file is provided AS IS with NO WARRANTY OF ANY KIND, INCLUDING
11+
* THE WARRANTY OF DESIGN, MERCHANTABILITY AND FITNESS FOR A
12+
* PARTICULAR PURPOSE.
13+
*
14+
* See http://www.quickfixengine.org/LICENSE for licensing information.
15+
*
16+
* Contact ask@quickfixengine.org if any conditions of this licensing
17+
* are not clear to you.
18+
******************************************************************************/
19+
20+
package quickfix;
21+
22+
import java.util.LinkedHashMap;
23+
import java.util.Map;
24+
25+
/**
26+
* An in-memory implementation of MessageQueue.
27+
* It uses a linked hash map as a backing map.
28+
*
29+
* @see MessageQueue
30+
*/
31+
public class InMemoryMessageQueue implements MessageQueue {
32+
33+
// The map should be accessed from a single thread
34+
private final Map<Integer, Message> backingMap = new LinkedHashMap<>();
35+
36+
@Override
37+
public void enqueue(int sequence, Message message) {
38+
backingMap.put(sequence, message);
39+
}
40+
41+
@Override
42+
public Message dequeue(int sequence) {
43+
return backingMap.remove(sequence);
44+
}
45+
46+
@Override
47+
public void clear() {
48+
backingMap.clear();
49+
}
50+
51+
// used in tests
52+
Map<Integer, Message> getBackingMap() {
53+
return backingMap;
54+
}
55+
}
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
/*******************************************************************************
2+
* Copyright (c) quickfixengine.org All rights reserved.
3+
*
4+
* This file is part of the QuickFIX FIX Engine
5+
*
6+
* This file may be distributed under the terms of the quickfixengine.org
7+
* license as defined by quickfixengine.org and appearing in the file
8+
* LICENSE included in the packaging of this file.
9+
*
10+
* This file is provided AS IS with NO WARRANTY OF ANY KIND, INCLUDING
11+
* THE WARRANTY OF DESIGN, MERCHANTABILITY AND FITNESS FOR A
12+
* PARTICULAR PURPOSE.
13+
*
14+
* See http://www.quickfixengine.org/LICENSE for licensing information.
15+
*
16+
* Contact ask@quickfixengine.org if any conditions of this licensing
17+
* are not clear to you.
18+
******************************************************************************/
19+
20+
package quickfix;
21+
22+
/**
23+
* Creates a message queue that stores all messages in memory.
24+
*
25+
* @see MessageQueue
26+
*/
27+
public class InMemoryMessageQueueFactory implements MessageQueueFactory {
28+
29+
@Override
30+
public MessageQueue create(SessionID sessionID) {
31+
return new InMemoryMessageQueue();
32+
}
33+
}
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
/*******************************************************************************
2+
* Copyright (c) quickfixengine.org All rights reserved.
3+
*
4+
* This file is part of the QuickFIX FIX Engine
5+
*
6+
* This file may be distributed under the terms of the quickfixengine.org
7+
* license as defined by quickfixengine.org and appearing in the file
8+
* LICENSE included in the packaging of this file.
9+
*
10+
* This file is provided AS IS with NO WARRANTY OF ANY KIND, INCLUDING
11+
* THE WARRANTY OF DESIGN, MERCHANTABILITY AND FITNESS FOR A
12+
* PARTICULAR PURPOSE.
13+
*
14+
* See http://www.quickfixengine.org/LICENSE for licensing information.
15+
*
16+
* Contact ask@quickfixengine.org if any conditions of this licensing
17+
* are not clear to you.
18+
******************************************************************************/
19+
20+
package quickfix;
21+
22+
/**
23+
* Used by a Session to store and retrieve messages with a sequence number higher than expected.
24+
*
25+
* @see quickfix.Session
26+
*/
27+
interface MessageQueue {
28+
29+
/**
30+
* Enqueue a message.
31+
*
32+
* @param sequence the sequence number
33+
* @param message the FIX message
34+
*/
35+
void enqueue(int sequence, Message message);
36+
37+
/**
38+
* Dequeue a message with given sequence number.
39+
*
40+
* @param sequence the sequence number
41+
* @return message the FIX message
42+
*/
43+
Message dequeue(int sequence);
44+
45+
/**
46+
* Remove messages from queue up to a given sequence number.
47+
*
48+
* @param seqnum up to which sequence number messages should be deleted
49+
*/
50+
default void dequeueMessagesUpTo(int seqnum) {
51+
for (int i = 1; i < seqnum; i++) {
52+
dequeue(i);
53+
}
54+
}
55+
56+
/**
57+
* Clear the queue.
58+
*/
59+
void clear();
60+
}
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/*******************************************************************************
2+
* Copyright (c) quickfixengine.org All rights reserved.
3+
*
4+
* This file is part of the QuickFIX FIX Engine
5+
*
6+
* This file may be distributed under the terms of the quickfixengine.org
7+
* license as defined by quickfixengine.org and appearing in the file
8+
* LICENSE included in the packaging of this file.
9+
*
10+
* This file is provided AS IS with NO WARRANTY OF ANY KIND, INCLUDING
11+
* THE WARRANTY OF DESIGN, MERCHANTABILITY AND FITNESS FOR A
12+
* PARTICULAR PURPOSE.
13+
*
14+
* See http://www.quickfixengine.org/LICENSE for licensing information.
15+
*
16+
* Contact ask@quickfixengine.org if any conditions of this licensing
17+
* are not clear to you.
18+
******************************************************************************/
19+
20+
package quickfix;
21+
22+
/**
23+
* Used by a Session to create a message queue implementation.
24+
*
25+
* @see Session
26+
*/
27+
public interface MessageQueueFactory {
28+
29+
/**
30+
* Creates a message queue implementation.
31+
*
32+
* @param sessionID the session ID, often used to access session configurations
33+
* @return the message queue implementation
34+
*/
35+
MessageQueue create(SessionID sessionID);
36+
}

quickfixj-core/src/main/java/quickfix/Session.java

Lines changed: 34 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -497,6 +497,30 @@ public class Session implements Closeable {
497497
boolean enableNextExpectedMsgSeqNum, boolean enableLastMsgSeqNumProcessed,
498498
boolean validateChecksum, List<StringField> logonTags, double heartBeatTimeoutMultiplier,
499499
boolean allowPossDup) {
500+
this(application, messageStoreFactory, new InMemoryMessageQueueFactory(), sessionID, dataDictionaryProvider, sessionSchedule, logFactory,
501+
messageFactory, heartbeatInterval, true, DEFAULT_MAX_LATENCY, UtcTimestampPrecision.MILLIS, false, false,
502+
false, false, true, false, true, false, DEFAULT_TEST_REQUEST_DELAY_MULTIPLIER, null, true, new int[] {5},
503+
false, false, false, false, true, false, true, false, null, true, DEFAULT_RESEND_RANGE_CHUNK_SIZE, false,
504+
false, false, new ArrayList<StringField>(), DEFAULT_HEARTBEAT_TIMEOUT_MULTIPLIER, allowPossDup);
505+
}
506+
507+
Session(Application application, MessageStoreFactory messageStoreFactory, MessageQueueFactory messageQueueFactory,
508+
SessionID sessionID, DataDictionaryProvider dataDictionaryProvider, SessionSchedule sessionSchedule,
509+
LogFactory logFactory, MessageFactory messageFactory, int heartbeatInterval,
510+
boolean checkLatency, int maxLatency, UtcTimestampPrecision timestampPrecision,
511+
boolean resetOnLogon, boolean resetOnLogout, boolean resetOnDisconnect,
512+
boolean refreshOnLogon, boolean checkCompID,
513+
boolean redundantResentRequestsAllowed, boolean persistMessages,
514+
boolean useClosedRangeForResend, double testRequestDelayMultiplier,
515+
DefaultApplVerID senderDefaultApplVerID, boolean validateSequenceNumbers,
516+
int[] logonIntervals, boolean resetOnError, boolean disconnectOnError,
517+
boolean disableHeartBeatCheck, boolean rejectGarbledMessage, boolean rejectInvalidMessage,
518+
boolean rejectMessageOnUnhandledException, boolean requiresOrigSendingTime,
519+
boolean forceResendWhenCorruptedStore, Set<InetAddress> allowedRemoteAddresses,
520+
boolean validateIncomingMessage, int resendRequestChunkSize,
521+
boolean enableNextExpectedMsgSeqNum, boolean enableLastMsgSeqNumProcessed,
522+
boolean validateChecksum, List<StringField> logonTags, double heartBeatTimeoutMultiplier,
523+
boolean allowPossDup) {
500524
this.application = application;
501525
this.sessionID = sessionID;
502526
this.sessionSchedule = sessionSchedule;
@@ -543,8 +567,13 @@ public class Session implements Closeable {
543567
addStateListener((SessionStateListener) messageStore);
544568
}
545569

570+
final MessageQueue messageQueue = messageQueueFactory.create(sessionID);
571+
if (messageQueue instanceof SessionStateListener) {
572+
addStateListener((SessionStateListener) messageQueue);
573+
}
574+
546575
state = new SessionState(this, engineLog, heartbeatInterval, heartbeatInterval != 0,
547-
messageStore, testRequestDelayMultiplier, heartBeatTimeoutMultiplier);
576+
messageStore, messageQueue, testRequestDelayMultiplier, heartBeatTimeoutMultiplier);
548577

549578
registerSession(this);
550579

@@ -1533,7 +1562,7 @@ private void nextSequenceReset(Message sequenceReset) throws IOException, Reject
15331562
}
15341563
// QFJ-728: newSequence will be the seqnum of the next message so we
15351564
// delete all older messages from the queue since they are effectively skipped.
1536-
state.dequeueMessagesUpTo(newSequence);
1565+
state.getMessageQueue().dequeueMessagesUpTo(newSequence);
15371566
} else if (newSequence < getExpectedTargetNum()) {
15381567

15391568
getLog().onErrorEvent(
@@ -2107,7 +2136,7 @@ public void disconnect(String reason, boolean logError) throws IOException {
21072136
state.setLogoutReceived(false);
21082137
state.setResetReceived(false);
21092138
state.setResetSent(false);
2110-
state.clearQueue();
2139+
state.getMessageQueue().clear();
21112140
state.clearLogoutReason();
21122141
state.setResendRange(0, 0);
21132142

@@ -2399,7 +2428,7 @@ private void nextQueued() throws FieldNotFound, RejectLogon, IncorrectDataFormat
23992428

24002429
private boolean nextQueued(int num) throws FieldNotFound, RejectLogon, IncorrectDataFormat,
24012430
IncorrectTagValue, UnsupportedMessageType, IOException, InvalidMessage {
2402-
final Message msg = state.dequeue(num);
2431+
final Message msg = state.getMessageQueue().dequeue(num);
24032432
if (msg != null) {
24042433
getLog().onEvent("Processing queued message: " + num);
24052434

@@ -2668,7 +2697,7 @@ private boolean sendRaw(Message message, int num) {
26682697
}
26692698

26702699
private void enqueueMessage(final Message msg, final int msgSeqNum) {
2671-
state.enqueue(msgSeqNum, msg);
2700+
state.getMessageQueue().enqueue(msgSeqNum, msg);
26722701
getLog().onEvent("Enqueued at pos " + msgSeqNum + ": " + msg);
26732702
}
26742703

0 commit comments

Comments
 (0)