Skip to content

Commit 21bec09

Browse files
authored
Corrections to SingleThreadedEventHandlingStrategy, SocketAcceptor/Initiator (#201)
* Corrections to SingleThreadedEventHandlingStrategy and SocketAcceptor/Initiator. ** Reverted changes from #192 since we actually want to process all messages on stop(). * Return from block() when stopped. ** It does not make sense to call getMessage() in that case. * minor cleanup in ThreadPerSessionEventHandlingStrategy * Added javadoc and unit test... ** to ensure that the session timer is not stopped by Message Processor thread when using stopHandlingMessages(true) * Changed SocketAcceptor and SocketInitiator to use SingleThreadedEventHandlingStrategy.stopHandlingMessages(true). ** Otherwise it could happen on quick restarts that the Message Processor thread stopped the SessionTimer concurrently which lead to an unresponsive FIX session, i.e. no Logon sent, no Heartbeats.
1 parent d22daeb commit 21bec09

File tree

6 files changed

+93
-69
lines changed

6 files changed

+93
-69
lines changed

quickfixj-core/src/main/java/quickfix/SocketAcceptor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@ public void stop(boolean forceDisconnect) {
122122
stopSessionTimer();
123123
} finally {
124124
try {
125-
eventHandlingStrategy.stopHandlingMessages();
125+
eventHandlingStrategy.stopHandlingMessages(true);
126126
} finally {
127127
Session.unregisterSessions(getSessions(), true);
128128
clearConnectorSessions();

quickfixj-core/src/main/java/quickfix/SocketInitiator.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,7 @@ public void stop(boolean forceDisconnect) {
134134
stopInitiators();
135135
} finally {
136136
try {
137-
eventHandlingStrategy.stopHandlingMessages();
137+
eventHandlingStrategy.stopHandlingMessages(true);
138138
} finally {
139139
Session.unregisterSessions(getSessions(), true);
140140
clearConnectorSessions();

quickfixj-core/src/main/java/quickfix/mina/SessionConnector.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -328,6 +328,14 @@ protected void stopSessionTimer() {
328328
}
329329
}
330330

331+
// visible for testing
332+
boolean checkSessionTimerRunning() {
333+
if ( sessionTimerFuture != null ) {
334+
return !sessionTimerFuture.isDone();
335+
}
336+
return false;
337+
}
338+
331339
protected ScheduledExecutorService getScheduledExecutorService() {
332340
return scheduledExecutorService;
333341
}

quickfixj-core/src/main/java/quickfix/mina/SingleThreadedEventHandlingStrategy.java

Lines changed: 13 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -99,8 +99,8 @@ private void block() {
9999
sessionConnector.stopSessionTimer();
100100
// reset the stoptime
101101
stopTime = 0;
102-
return;
103102
}
103+
return;
104104
}
105105
}
106106
try {
@@ -167,16 +167,27 @@ private synchronized void startHandlingMessages() {
167167
isStopped = false;
168168
}
169169

170+
/**
171+
* Stops processing of messages without waiting for message processing
172+
* thread to finish.
173+
*
174+
* It is advised to call stopHandlingMessages(true) instead of this method.
175+
*/
170176
public synchronized void stopHandlingMessages() {
171177
for (Session session : sessionConnector.getSessionMap().values()) {
172178
onMessage(session, END_OF_STREAM);
173179
}
174180
isStopped = true;
175181
}
176182

183+
/**
184+
* Stops processing of messages and optionally waits for message processing
185+
* thread to finish.
186+
*
187+
* @param join true to wait for thread to finish
188+
*/
177189
public void stopHandlingMessages(boolean join) {
178190
stopHandlingMessages();
179-
messageProcessingThread.interrupt();
180191
if (join) {
181192
try {
182193
messageProcessingThread.join();
@@ -227,12 +238,6 @@ public void start() {
227238
executor.execute(wrapper);
228239
}
229240

230-
public void interrupt() {
231-
if (executor instanceof DedicatedThreadExecutor) {
232-
((DedicatedThreadExecutor)executor).interrupt();
233-
}
234-
}
235-
236241
/**
237242
* Provides the Thread::join and Thread::isAlive semantics on the nested Runnable.
238243
*/
@@ -290,12 +295,6 @@ public void execute(Runnable command) {
290295
thread.setDaemon(true);
291296
thread.start();
292297
}
293-
294-
public void interrupt() {
295-
if (thread != null) {
296-
thread.interrupt();
297-
}
298-
}
299298
}
300299

301300
}

quickfixj-core/src/main/java/quickfix/mina/ThreadPerSessionEventHandlingStrategy.java

Lines changed: 52 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -113,59 +113,58 @@ public void stopDispatcherThreads() {
113113
}
114114
}
115115

116-
/**
117-
* A stand-in for the Thread class that delegates to an Executor.
118-
* Implements all the API required by pre-existing QFJ code.
119-
*/
120-
protected static abstract class ThreadAdapter implements Runnable {
121-
122-
private final Executor executor;
123-
private final String name;
124-
125-
public ThreadAdapter(String name, Executor executor) {
126-
this.name = name;
127-
this.executor = executor != null ? executor : new DedicatedThreadExecutor(name);
128-
}
129-
130-
public void start() {
131-
executor.execute(this);
132-
}
133-
134-
@Override
135-
public final void run() {
136-
Thread currentThread = Thread.currentThread();
137-
String threadName = currentThread.getName();
138-
try {
139-
if (!name.equals(threadName)) {
140-
currentThread.setName(name + " (" + threadName + ")");
141-
}
142-
doRun();
143-
} finally {
144-
currentThread.setName(threadName);
145-
}
146-
}
147-
148-
abstract void doRun();
149-
150-
/**
151-
* An Executor that uses it's own dedicated Thread.
152-
* Provides equivalent behavior to the prior non-Executor approach.
153-
*/
154-
static final class DedicatedThreadExecutor implements Executor {
155-
156-
private final String name;
157-
158-
DedicatedThreadExecutor(String name) {
159-
this.name = name;
160-
}
161-
162-
@Override
163-
public void execute(Runnable command) {
164-
new Thread(command, name).start();
165-
}
166-
167-
}
116+
/**
117+
* A stand-in for the Thread class that delegates to an Executor.
118+
* Implements all the API required by pre-existing QFJ code.
119+
*/
120+
protected static abstract class ThreadAdapter implements Runnable {
121+
122+
private final Executor executor;
123+
private final String name;
168124

125+
public ThreadAdapter(String name, Executor executor) {
126+
this.name = name;
127+
this.executor = executor != null ? executor : new DedicatedThreadExecutor(name);
128+
}
129+
130+
public void start() {
131+
executor.execute(this);
132+
}
133+
134+
@Override
135+
public final void run() {
136+
Thread currentThread = Thread.currentThread();
137+
String threadName = currentThread.getName();
138+
try {
139+
if (!name.equals(threadName)) {
140+
currentThread.setName(name + " (" + threadName + ")");
141+
}
142+
doRun();
143+
} finally {
144+
currentThread.setName(threadName);
145+
}
146+
}
147+
148+
abstract void doRun();
149+
150+
/**
151+
* An Executor that uses its own dedicated Thread. Provides equivalent
152+
* behavior to the prior non-Executor approach.
153+
*/
154+
static final class DedicatedThreadExecutor implements Executor {
155+
156+
private final String name;
157+
158+
DedicatedThreadExecutor(String name) {
159+
this.name = name;
160+
}
161+
162+
@Override
163+
public void execute(Runnable command) {
164+
new Thread(command, name).start();
165+
}
166+
167+
}
169168
}
170169

171170
protected class MessageDispatchingThread extends ThreadAdapter {
@@ -226,7 +225,7 @@ void doRun() {
226225
}
227226
}
228227
if (!messages.isEmpty()) {
229-
final List<Message> tempList = new ArrayList<>();
228+
final List<Message> tempList = new ArrayList<>(messages.size());
230229
queueTracker.drainTo(tempList);
231230
for (Message message : tempList) {
232231
try {

quickfixj-core/src/test/java/quickfix/mina/SingleThreadedEventHandlingStrategyTest.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
import java.util.Map;
4646
import java.util.concurrent.CountDownLatch;
4747
import org.junit.AfterClass;
48+
import static org.junit.Assert.assertTrue;
4849

4950
/**
5051
*
@@ -149,6 +150,23 @@ public void testMultipleStartStop() throws Exception {
149150
}
150151
}
151152

153+
/**
154+
* During quick restarts: make sure that session timer is started and not stopped via
155+
* block() method called from Message Processor thread.
156+
*/
157+
@Test
158+
public void testMultipleStartSessionTimer() throws Exception {
159+
SessionSettings settings = new SessionSettings();
160+
SessionConnector connector = new SessionConnectorUnderTest(settings, sessionFactory);
161+
ehs = new SingleThreadedEventHandlingStrategy(connector, 1000);
162+
for (int i = 0; i < 1000; i++) {
163+
connector.startSessionTimer();
164+
ehs.blockInThread();
165+
assertTrue(connector.checkSessionTimerRunning());
166+
ehs.stopHandlingMessages(true);
167+
}
168+
}
169+
152170
@Test
153171
public void shouldCleanUpAcceptorQFJMessageProcessorThreadAfterInterrupt() throws Exception {
154172
assertQFJMessageProcessorThreads(0);

0 commit comments

Comments
 (0)