Skip to content

Commit b68825e

Browse files
committed
QFJ-854: Race discovered in SingleThreadedEventHandlingStrategyTest
- extended SingleThreadedEventHandlingStrategy and Test as suggested by Marcin L QFJ-849: SocketInitiator does not stop properly - changed SocketAcceptor/Initiator: - put END_OF_STREAM message into event queue outside of synchronized block to prevent locking on stop - moved setting of isStarted flag to an earlier position because flag would not be set for a connector that is started in blocking mode
1 parent bc4118b commit b68825e

File tree

4 files changed

+215
-15
lines changed

4 files changed

+215
-15
lines changed

quickfixj-core/src/main/java/quickfix/SocketAcceptor.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -86,10 +86,11 @@ private void initialize(boolean blockInThread) throws ConfigError {
8686
startAcceptingConnections();
8787
if (blockInThread) {
8888
eventHandlingStrategy.blockInThread();
89+
isStarted = Boolean.TRUE;
8990
} else {
91+
isStarted = Boolean.TRUE;
9092
eventHandlingStrategy.block();
9193
}
92-
isStarted = Boolean.TRUE;
9394
} else {
9495
log.warn("Ignored attempt to start already running SocketAcceptor.");
9596
}
@@ -103,9 +104,9 @@ public void stop() {
103104

104105
@Override
105106
public void stop(boolean forceDisconnect) {
107+
eventHandlingStrategy.stopHandlingMessages();
106108
synchronized (lock) {
107109
try {
108-
eventHandlingStrategy.stopHandlingMessages();
109110
try {
110111
stopAcceptingConnections();
111112
} catch (ConfigError e) {

quickfixj-core/src/main/java/quickfix/SocketInitiator.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -96,9 +96,9 @@ public void stop() {
9696

9797
@Override
9898
public void stop(boolean forceDisconnect) {
99+
eventHandlingStrategy.stopHandlingMessages();
99100
synchronized (lock) {
100101
try {
101-
eventHandlingStrategy.stopHandlingMessages();
102102
logoutAllSessions(forceDisconnect);
103103
stopInitiators();
104104
} finally {
@@ -118,10 +118,11 @@ private void initialize(boolean blockInThread) throws ConfigError {
118118
startInitiators();
119119
if (blockInThread) {
120120
eventHandlingStrategy.blockInThread();
121+
isStarted = Boolean.TRUE;
121122
} else {
123+
isStarted = Boolean.TRUE;
122124
eventHandlingStrategy.block();
123125
}
124-
isStarted = Boolean.TRUE;
125126
} else {
126127
log.warn("Ignored attempt to start already running SocketInitiator.");
127128
}

quickfixj-core/src/main/java/quickfix/mina/SingleThreadedEventHandlingStrategy.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ public void onMessage(Session quickfixSession, Message message) {
5353
try {
5454
eventQueue.put(new SessionMessageEvent(quickfixSession, message));
5555
} catch (InterruptedException e) {
56+
isStopped = true;
5657
throw new RuntimeException(e);
5758
}
5859
}

quickfixj-core/src/test/java/quickfix/mina/SingleThreadedEventHandlingStrategyTest.java

Lines changed: 208 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -22,15 +22,27 @@
2222
import java.lang.management.ManagementFactory;
2323
import java.lang.management.ThreadInfo;
2424
import java.lang.management.ThreadMXBean;
25-
import static junit.framework.Assert.assertEquals;
25+
import java.util.HashMap;
26+
import java.util.Map;
27+
import java.util.concurrent.CountDownLatch;
28+
29+
import org.junit.Assert;
2630
import org.junit.Test;
31+
import org.mockito.Mockito;
32+
33+
import quickfix.Application;
2734
import quickfix.ConfigError;
35+
import quickfix.DefaultMessageFactory;
2836
import quickfix.DefaultSessionFactory;
37+
import quickfix.FixVersions;
2938
import quickfix.MemoryStoreFactory;
3039
import quickfix.RuntimeError;
3140
import quickfix.ScreenLogFactory;
3241
import quickfix.SessionFactory;
42+
import quickfix.SessionID;
3343
import quickfix.SessionSettings;
44+
import quickfix.SocketAcceptor;
45+
import quickfix.SocketInitiator;
3446
import quickfix.UnitTestApplication;
3547

3648
/**
@@ -46,13 +58,12 @@ public class SingleThreadedEventHandlingStrategyTest {
4658
public void testDoubleStart() throws Exception {
4759
SingleThreadedEventHandlingStrategy ehs = null;
4860
try {
49-
ThreadMXBean bean = ManagementFactory.getThreadMXBean();
5061
SessionSettings settings = new SessionSettings();
5162
SessionConnector connector = new SessionConnectorUnderTest(settings, sessionFactory);
5263
ehs = new SingleThreadedEventHandlingStrategy(connector, 1000);
5364
ehs.blockInThread();
5465
ehs.blockInThread();
55-
checkThreads(bean);
66+
assertQFJMessageProcessorThreads(1);
5667
} finally {
5768
if ( ehs != null ) {
5869
ehs.stopHandlingMessages(true);
@@ -66,14 +77,13 @@ public void testMultipleStart() throws Exception {
6677
SingleThreadedEventHandlingStrategy ehs = null;
6778

6879
try {
69-
ThreadMXBean bean = ManagementFactory.getThreadMXBean();
7080
SessionSettings settings = new SessionSettings();
7181
SessionConnector connector = new SessionConnectorUnderTest(settings, sessionFactory);
7282
ehs = new SingleThreadedEventHandlingStrategy(connector, 1000);
7383
for (int i = 0; i < 20; i++) {
7484
ehs.blockInThread();
7585
}
76-
checkThreads(bean);
86+
assertQFJMessageProcessorThreads(1);
7787
} finally {
7888
if (ehs != null) {
7989
ehs.stopHandlingMessages(true);
@@ -85,15 +95,14 @@ public void testMultipleStart() throws Exception {
8595
public void testStartStop() throws Exception {
8696
SingleThreadedEventHandlingStrategy ehs = null;
8797
try {
88-
ThreadMXBean bean = ManagementFactory.getThreadMXBean();
8998
SessionSettings settings = new SessionSettings();
9099
SessionConnector connector = new SessionConnectorUnderTest(settings, sessionFactory);
91100
ehs = new SingleThreadedEventHandlingStrategy(connector, 1000);
92101
ehs.blockInThread();
93102
ehs.stopHandlingMessages();
94103
Thread.sleep(500);
95104
ehs.blockInThread();
96-
checkThreads(bean);
105+
assertQFJMessageProcessorThreads(1);
97106
} finally {
98107
if ( ehs != null ) {
99108
ehs.stopHandlingMessages(true);
@@ -107,13 +116,12 @@ public void testMultipleStartStop() throws Exception {
107116
SingleThreadedEventHandlingStrategy ehs = null;
108117
SessionSettings settings = new SessionSettings();
109118
SessionConnector connector = new SessionConnectorUnderTest(settings, sessionFactory);
110-
ThreadMXBean bean = ManagementFactory.getThreadMXBean();
111119

112120
for (int i = 0; i < 20; i++) {
113121
try {
114122
ehs = new SingleThreadedEventHandlingStrategy(connector, 1000);
115123
ehs.blockInThread();
116-
checkThreads(bean);
124+
assertQFJMessageProcessorThreads(1);
117125
} finally {
118126
if (ehs != null) {
119127
ehs.stopHandlingMessages(true);
@@ -122,7 +130,182 @@ public void testMultipleStartStop() throws Exception {
122130
}
123131
}
124132

125-
private void checkThreads(ThreadMXBean bean) {
133+
@Test
134+
public void shouldCleanUpAcceptorQFJMessageProcessorThreadAfterInterrupt() throws Exception {
135+
final SocketAcceptor acceptor = createAcceptor(0);
136+
final CountDownLatch acceptorCountDownLatch = new CountDownLatch(1);
137+
138+
Thread acceptorThread = new Thread("Acceptor-Thread") {
139+
@Override
140+
public void run() {
141+
try {
142+
acceptor.start();
143+
acceptorCountDownLatch.await();
144+
145+
Thread.currentThread().interrupt();
146+
} catch (Exception e) {
147+
e.printStackTrace();
148+
} finally {
149+
acceptor.stop();
150+
}
151+
}
152+
};
153+
154+
acceptorThread.setDaemon(true);
155+
acceptorThread.start();
156+
157+
Thread.sleep(1000);
158+
acceptorCountDownLatch.countDown();
159+
160+
Thread.sleep(1000);
161+
assertQFJMessageProcessorThreads(0);
162+
163+
acceptorThread.join();
164+
}
165+
166+
@Test
167+
public void shouldCleanUpInitiatorQFJMessageProcessorThreadAfterInterrupt() throws Exception {
168+
final SocketInitiator initiator = createInitiator(0);
169+
final CountDownLatch initiatorCountDownLatch = new CountDownLatch(1);
170+
171+
Thread initiatorThread = new Thread("Initiator-Thread") {
172+
@Override
173+
public void run() {
174+
try {
175+
initiator.start();
176+
initiatorCountDownLatch.await();
177+
178+
Thread.currentThread().interrupt();
179+
} catch (Exception e) {
180+
e.printStackTrace();
181+
} finally {
182+
initiator.stop();
183+
}
184+
}
185+
};
186+
187+
initiatorThread.setDaemon(true);
188+
initiatorThread.start();
189+
190+
Thread.sleep(1000);
191+
initiatorCountDownLatch.countDown();
192+
193+
Thread.sleep(1000);
194+
assertQFJMessageProcessorThreads(0);
195+
196+
initiatorThread.join();
197+
}
198+
199+
@Test(timeout = 10000)
200+
public void shouldCleanUpAcceptorQFJMessageProcessorThreadAfterStop() throws Exception {
201+
final SocketAcceptor acceptor = createAcceptor(1);
202+
203+
Thread acceptorThread = new Thread("Acceptor-Thread") {
204+
@Override
205+
public void run() {
206+
try {
207+
acceptor.block();
208+
} catch (Exception e) {
209+
e.printStackTrace();
210+
} finally {
211+
acceptor.stop();
212+
}
213+
}
214+
};
215+
216+
acceptorThread.setDaemon(true);
217+
acceptorThread.start();
218+
219+
Thread.sleep(1000);
220+
acceptor.stop();
221+
222+
Thread.sleep(1000);
223+
assertQFJMessageProcessorThreads(0);
224+
225+
acceptorThread.join();
226+
}
227+
228+
@Test(timeout = 10000)
229+
public void shouldCleanUpInitiatorQFJMessageProcessorThreadAfterStop() throws Exception {
230+
final SocketInitiator initiator = createInitiator(1);
231+
232+
Thread initiatorThread = new Thread("Initiator-Thread") {
233+
@Override
234+
public void run() {
235+
try {
236+
initiator.block();
237+
} catch (Exception e) {
238+
e.printStackTrace();
239+
} finally {
240+
initiator.stop();
241+
}
242+
}
243+
};
244+
245+
initiatorThread.setDaemon(true);
246+
initiatorThread.start();
247+
248+
Thread.sleep(1000);
249+
initiator.stop();
250+
251+
Thread.sleep(1000);
252+
assertQFJMessageProcessorThreads(0);
253+
254+
initiatorThread.join();
255+
}
256+
257+
private SocketAcceptor createAcceptor(int i) throws ConfigError {
258+
Map<Object, Object> acceptorProperties = new HashMap<Object, Object>();
259+
acceptorProperties.put("ConnectionType", "acceptor");
260+
acceptorProperties.put("HeartBtInt", "5");
261+
acceptorProperties.put("SocketAcceptHost", "localhost");
262+
acceptorProperties.put("SocketAcceptPort", String.valueOf(9999 + i) );
263+
acceptorProperties.put("ReconnectInterval", "2");
264+
acceptorProperties.put("StartTime", "00:00:00");
265+
acceptorProperties.put("EndTime", "00:00:00");
266+
acceptorProperties.put("SenderCompID", "ISLD");
267+
acceptorProperties.put("TargetCompID", "TW");
268+
269+
SessionSettings acceptorSettings = new SessionSettings();
270+
acceptorSettings.set(acceptorProperties);
271+
272+
SessionID acceptorSessionId = new SessionID(FixVersions.BEGINSTRING_FIX44, "ISLD", "TW");
273+
acceptorSettings.setString(acceptorSessionId, "BeginString", FixVersions.BEGINSTRING_FIX44);
274+
acceptorSettings.setString(acceptorSessionId, "DataDictionary", "FIX44.xml");
275+
276+
SocketAcceptor acceptor = new SocketAcceptor(Mockito.mock(Application.class), new MemoryStoreFactory(),
277+
acceptorSettings, new DefaultMessageFactory());
278+
279+
return acceptor;
280+
}
281+
282+
public SocketInitiator createInitiator(int i) throws ConfigError {
283+
Map<Object, Object> acceptorProperties = new HashMap<Object, Object>();
284+
acceptorProperties.put("ConnectionType", "initiator");
285+
acceptorProperties.put("HeartBtInt", "5");
286+
acceptorProperties.put("SocketConnectHost", "localhost");
287+
acceptorProperties.put("SocketConnectPort", String.valueOf(9999 + i) );
288+
acceptorProperties.put("ReconnectInterval", "2");
289+
acceptorProperties.put("StartTime", "00:00:00");
290+
acceptorProperties.put("EndTime", "00:00:00");
291+
acceptorProperties.put("SenderCompID", "TW");
292+
acceptorProperties.put("TargetCompID", "ISLD");
293+
294+
SessionSettings acceptorSettings = new SessionSettings();
295+
acceptorSettings.set(acceptorProperties);
296+
297+
SessionID acceptorSessionId = new SessionID(FixVersions.BEGINSTRING_FIX44, "TW", "ISLD");
298+
acceptorSettings.setString(acceptorSessionId, "BeginString", FixVersions.BEGINSTRING_FIX44);
299+
acceptorSettings.setString(acceptorSessionId, "DataDictionary", "FIX44.xml");
300+
301+
SocketInitiator initiator = new SocketInitiator(Mockito.mock(Application.class), new MemoryStoreFactory(),
302+
acceptorSettings, new DefaultMessageFactory());
303+
304+
return initiator;
305+
}
306+
307+
private void assertQFJMessageProcessorThreads(int expected) {
308+
ThreadMXBean bean = ManagementFactory.getThreadMXBean();
126309
ThreadInfo[] dumpAllThreads = bean.dumpAllThreads(false, false);
127310
int qfjMPThreads = 0;
128311
for (ThreadInfo threadInfo : dumpAllThreads) {
@@ -131,7 +314,20 @@ private void checkThreads(ThreadMXBean bean) {
131314
qfjMPThreads++;
132315
}
133316
}
134-
assertEquals("Exactly one 'QFJ Message Processor' thread expected", 1, qfjMPThreads);
317+
if (qfjMPThreads > 1) {
318+
for (ThreadInfo threadInfo : dumpAllThreads) {
319+
if (SingleThreadedEventHandlingStrategy.MESSAGE_PROCESSOR_THREAD_NAME.equals(threadInfo
320+
.getThreadName())) {
321+
System.out.println( threadInfo.getThreadName() + " " + threadInfo.getThreadState());
322+
StackTraceElement[] stackTrace = threadInfo.getStackTrace();
323+
for (StackTraceElement stackTrace1 : stackTrace) {
324+
System.out.println( " " + stackTrace1 );
325+
}
326+
}
327+
}
328+
}
329+
330+
Assert.assertEquals("Expected " + expected + " 'QFJ Message Processor' thread(s)", expected, qfjMPThreads);
135331
}
136332

137333
private static class SessionConnectorUnderTest extends SessionConnector {
@@ -158,3 +354,4 @@ public void block() throws ConfigError, RuntimeError {
158354
}
159355

160356
}
357+

0 commit comments

Comments
 (0)