Skip to content

Commit 1126a4f

Browse files
authored
Consider ContinueInitializationOnError on creation of IoSessionInitiator/IoAcceptor (#376)
* Consider ContinueInitializationOnError on creation of IoSessionInitiator/IoAcceptor - Now SSL-related errors can be ignored and hence will not prevent initialization of other sessions on the same Initiator/Acceptor - Corrected documentation. - changed logging level from ERROR to WARN because the sessions do not fail when `ContinueInitializationOnError=Y`
1 parent 3b1d0fe commit 1126a4f

File tree

7 files changed

+166
-52
lines changed

7 files changed

+166
-52
lines changed

quickfixj-core/src/main/doc/usermanual/usage/configuration.html

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -601,6 +601,12 @@ <H3>QuickFIX Settings</H3>
601601
</TD>
602602
<TD> empty, ie all remote addresses are allowed </TD>
603603
</TR>
604+
<TR ALIGN="left" VALIGN="middle">
605+
<TD> <I>AcceptorTemplate</I> </TD>
606+
<TD> Designates a template Acceptor session. See <a href="acceptor_dynamic.html">Dynamic Acceptor Sessions</a></TD>
607+
<TD> Y<BR>N</TD>
608+
<TD>N</TD>
609+
</TR>
604610

605611
<TR ALIGN="center" VALIGN="middle">
606612
<TD COLSPAN="4" class="subsection"><A NAME="Security">Secure Communication Options</A></TD>

quickfixj-core/src/main/java/quickfix/Acceptor.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
* Contact ask@quickfixengine.org if any conditions of this licensing
1717
* are not clear to you.
1818
******************************************************************************/
19-
2019
package quickfix;
2120

2221
/**
@@ -25,7 +24,8 @@
2524
public interface Acceptor extends Connector {
2625

2726
/**
28-
* Acceptor setting specifying the socket protocol used to accept connections.
27+
* Acceptor setting specifying the socket protocol used to accept
28+
* connections.
2929
*/
3030
String SETTING_SOCKET_ACCEPT_PROTOCOL = "SocketAcceptProtocol";
3131

@@ -35,12 +35,13 @@ public interface Acceptor extends Connector {
3535
String SETTING_SOCKET_ACCEPT_PORT = "SocketAcceptPort";
3636

3737
/**
38-
* Acceptor setting specifying local IP interface address for accepting connections.
38+
* Acceptor setting specifying local IP interface address for accepting
39+
* connections.
3940
*/
4041
String SETTING_SOCKET_ACCEPT_ADDRESS = "SocketAcceptAddress";
4142

4243
/**
43-
* Acceptor setting specifying local IP interface address for accepting connections.
44+
* Acceptor setting specifying a template acceptor session.
4445
*/
4546
String SETTING_ACCEPTOR_TEMPLATE = "AcceptorTemplate";
4647
}

quickfixj-core/src/main/java/quickfix/mina/SessionConnector.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -460,10 +460,14 @@ public static void closeManagedSessionsAndDispose(IoService ioService, boolean a
460460
}
461461
}
462462

463-
protected boolean isContinueInitOnError() throws ConfigError, FieldConvertError {
463+
protected boolean isContinueInitOnError() {
464464
boolean continueInitOnError = false;
465465
if (settings.isSetting(SessionFactory.SETTING_CONTINUE_INIT_ON_ERROR)) {
466-
continueInitOnError = settings.getBool(SessionFactory.SETTING_CONTINUE_INIT_ON_ERROR);
466+
try {
467+
continueInitOnError = settings.getBool(SessionFactory.SETTING_CONTINUE_INIT_ON_ERROR);
468+
} catch (ConfigError | FieldConvertError ex) {
469+
// ignore and return default
470+
}
467471
}
468472
return continueInitOnError;
469473
}

quickfixj-core/src/main/java/quickfix/mina/acceptor/AbstractSocketAcceptor.java

Lines changed: 38 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
import quickfix.mina.ssl.SSLSupport;
5050

5151
import javax.net.ssl.SSLContext;
52+
import java.io.IOException;
5253
import java.net.SocketAddress;
5354
import java.security.GeneralSecurityException;
5455
import java.util.Collection;
@@ -93,12 +94,13 @@ protected AbstractSocketAcceptor(Application application,
9394
// TODO SYNC Does this method really need synchronization?
9495
protected synchronized void startAcceptingConnections() throws ConfigError {
9596

96-
SocketAddress address = null;
97-
try {
98-
createSessions(getSettings());
99-
startSessionTimer();
97+
boolean continueInitOnError = isContinueInitOnError();
98+
createSessions(getSettings(), continueInitOnError);
99+
startSessionTimer();
100100

101-
for (AcceptorSocketDescriptor socketDescriptor : socketDescriptorForAddress.values()) {
101+
SocketAddress address = null;
102+
for (AcceptorSocketDescriptor socketDescriptor : socketDescriptorForAddress.values()) {
103+
try {
102104
address = socketDescriptor.getAddress();
103105
IoAcceptor ioAcceptor = getIoAcceptor(socketDescriptor);
104106
CompositeIoFilterChainBuilder ioFilterChainBuilder = new CompositeIoFilterChainBuilder(getIoFilterChainBuilder());
@@ -114,12 +116,14 @@ protected synchronized void startAcceptingConnections() throws ConfigError {
114116
ioAcceptor.setCloseOnDeactivation(false);
115117
ioAcceptor.bind(socketDescriptor.getAddress());
116118
log.info("Listening for connections at {} for session(s) {}", address, socketDescriptor.getAcceptedSessions().keySet());
119+
} catch (IOException | GeneralSecurityException | ConfigError e) {
120+
if (continueInitOnError) {
121+
log.warn("error during session initialization for session(s) {}, continuing...", socketDescriptor.getAcceptedSessions().keySet(), e);
122+
} else {
123+
log.error("Cannot start acceptor session for {}, error: {}", address, e);
124+
throw new RuntimeError(e);
125+
}
117126
}
118-
} catch (FieldConvertError e) {
119-
throw new ConfigError(e);
120-
} catch (Exception e) {
121-
log.error("Cannot start acceptor session for {}, error: {}", address, e);
122-
throw new RuntimeError(e);
123127
}
124128
}
125129

@@ -213,35 +217,40 @@ private boolean equals(Object object1, Object object2) {
213217
return object1 == null ? object2 == null : object1.equals(object2);
214218
}
215219

216-
private void createSessions(SessionSettings settings) throws ConfigError, FieldConvertError {
220+
private void createSessions(SessionSettings settings, boolean continueInitOnError) throws ConfigError {
217221
Map<SessionID, Session> allSessions = new HashMap<>();
218-
boolean continueInitOnError = isContinueInitOnError();
219-
220222
for (Iterator<SessionID> i = settings.sectionIterator(); i.hasNext();) {
221223
SessionID sessionID = i.next();
222-
String connectionType = settings.getString(sessionID,
223-
SessionFactory.SETTING_CONNECTION_TYPE);
224+
try {
225+
String connectionType = null;
226+
if (settings.isSetting(sessionID, SessionFactory.SETTING_CONNECTION_TYPE)) {
227+
connectionType = settings.getString(sessionID,
228+
SessionFactory.SETTING_CONNECTION_TYPE);
229+
}
224230

225-
boolean isTemplate = false;
226-
if (settings.isSetting(sessionID, Acceptor.SETTING_ACCEPTOR_TEMPLATE)) {
227-
isTemplate = settings.getBool(sessionID, Acceptor.SETTING_ACCEPTOR_TEMPLATE);
228-
}
231+
if (SessionFactory.ACCEPTOR_CONNECTION_TYPE.equals(connectionType)) {
232+
boolean isTemplate = false;
233+
if (settings.isSetting(sessionID, Acceptor.SETTING_ACCEPTOR_TEMPLATE)) {
234+
try {
235+
isTemplate = settings.getBool(sessionID, Acceptor.SETTING_ACCEPTOR_TEMPLATE);
236+
} catch (FieldConvertError | ConfigError ex) {
237+
// ignore and use default
238+
}
239+
}
229240

230-
if (connectionType.equals(SessionFactory.ACCEPTOR_CONNECTION_TYPE)) {
231-
try {
232-
AcceptorSocketDescriptor descriptor = getAcceptorSocketDescriptor(settings, sessionID);
233241
if (!isTemplate) {
242+
AcceptorSocketDescriptor descriptor = getAcceptorSocketDescriptor(settings, sessionID);
234243
Session session = sessionFactory.create(sessionID, settings);
235244
descriptor.acceptSession(session);
236245
allSessions.put(sessionID, session);
237246
}
238-
} catch (Throwable t) {
239-
if (continueInitOnError) {
240-
log.error("error during session initialization for {}, continuing...", sessionID, t);
241-
} else {
242-
throw t instanceof ConfigError ? (ConfigError) t : new ConfigError(
243-
"error during session initialization", t);
244-
}
247+
}
248+
} catch (Throwable t) {
249+
if (continueInitOnError) {
250+
log.warn("error during session initialization for {}, continuing...", sessionID, t);
251+
} else {
252+
throw t instanceof ConfigError ? (ConfigError) t : new ConfigError(
253+
"error during session initialization", t);
245254
}
246255
}
247256
}

quickfixj-core/src/main/java/quickfix/mina/initiator/AbstractSocketInitiator.java

Lines changed: 22 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -102,16 +102,17 @@ protected AbstractSocketInitiator(SessionSettings settings, SessionFactory sessi
102102
protected void createSessionInitiators()
103103
throws ConfigError {
104104
try {
105-
createSessions();
105+
boolean continueInitOnError = isContinueInitOnError();
106+
createSessions(continueInitOnError);
106107
for (final Session session : getSessionMap().values()) {
107-
createInitiator(session);
108+
createInitiator(session, continueInitOnError);
108109
}
109110
} catch (final FieldConvertError e) {
110111
throw new ConfigError(e);
111112
}
112113
}
113114

114-
private void createInitiator(final Session session) throws ConfigError, FieldConvertError {
115+
private void createInitiator(final Session session, final boolean continueInitOnError) throws ConfigError, FieldConvertError {
115116

116117
SessionSettings settings = getSettings();
117118
final SessionID sessionID = session.getSessionID();
@@ -171,14 +172,21 @@ && getSettings().isSetting(sessionID, Initiator.SETTING_PROXY_DOMAIN)) {
171172
}
172173

173174
ScheduledExecutorService scheduledExecutorService = (scheduledReconnectExecutor != null ? scheduledReconnectExecutor : getScheduledExecutorService());
174-
final IoSessionInitiator ioSessionInitiator = new IoSessionInitiator(session,
175-
socketAddresses, localAddress, reconnectingIntervals,
176-
scheduledExecutorService, networkingOptions,
177-
getEventHandlingStrategy(), getIoFilterChainBuilder(), sslEnabled, sslConfig,
178-
proxyType, proxyVersion, proxyHost, proxyPort, proxyUser, proxyPassword, proxyDomain, proxyWorkstation);
179-
180-
initiators.add(ioSessionInitiator);
181-
175+
try {
176+
final IoSessionInitiator ioSessionInitiator = new IoSessionInitiator(session,
177+
socketAddresses, localAddress, reconnectingIntervals,
178+
scheduledExecutorService, networkingOptions,
179+
getEventHandlingStrategy(), getIoFilterChainBuilder(), sslEnabled, sslConfig,
180+
proxyType, proxyVersion, proxyHost, proxyPort, proxyUser, proxyPassword, proxyDomain, proxyWorkstation);
181+
182+
initiators.add(ioSessionInitiator);
183+
} catch (ConfigError e) {
184+
if (continueInitOnError) {
185+
log.warn("error during session initialization for {}, continuing...", sessionID, e);
186+
} else {
187+
throw e;
188+
}
189+
}
182190
}
183191

184192
// QFJ-482
@@ -201,10 +209,8 @@ private SocketAddress getLocalAddress(SessionSettings settings, final SessionID
201209
return localAddress;
202210
}
203211

204-
private void createSessions() throws ConfigError, FieldConvertError {
212+
private void createSessions(boolean continueInitOnError) throws ConfigError, FieldConvertError {
205213
final SessionSettings settings = getSettings();
206-
boolean continueInitOnError = isContinueInitOnError();
207-
208214
final Map<SessionID, Session> initiatorSessions = new HashMap<>();
209215
for (final Iterator<SessionID> i = settings.sectionIterator(); i.hasNext();) {
210216
final SessionID sessionID = i.next();
@@ -216,7 +222,7 @@ private void createSessions() throws ConfigError, FieldConvertError {
216222
}
217223
} catch (final Throwable e) {
218224
if (continueInitOnError) {
219-
log.error("error during session initialization for {}, continuing...", sessionID, e);
225+
log.warn("error during session initialization for {}, continuing...", sessionID, e);
220226
} else {
221227
throw e instanceof ConfigError ? (ConfigError) e : new ConfigError(
222228
"error during session initialization", e);
@@ -232,7 +238,7 @@ public void createDynamicSession(SessionID sessionID) throws ConfigError {
232238
try {
233239
Session session = createSession(sessionID);
234240
super.addDynamicSession(session);
235-
createInitiator(session);
241+
createInitiator(session, isContinueInitOnError());
236242
startInitiators();
237243
} catch (final FieldConvertError e) {
238244
throw new ConfigError(e);

quickfixj-core/src/test/java/quickfix/SocketAcceptorTest.java

Lines changed: 58 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,19 @@
1919

2020
package quickfix;
2121

22+
import org.apache.mina.core.service.IoAcceptor;
23+
import org.apache.mina.util.AvailablePortFinder;
2224
import org.junit.After;
2325
import org.junit.Test;
2426
import org.slf4j.Logger;
2527
import org.slf4j.LoggerFactory;
2628
import quickfix.field.MsgType;
2729
import quickfix.mina.ProtocolFactory;
2830
import quickfix.mina.SingleThreadedEventHandlingStrategy;
31+
import quickfix.mina.message.FIXProtocolCodecFactory;
32+
import quickfix.mina.ssl.SSLSupport;
2933

34+
import java.io.IOException;
3035
import java.lang.management.ManagementFactory;
3136
import java.lang.management.ThreadInfo;
3237
import java.lang.management.ThreadMXBean;
@@ -35,7 +40,12 @@
3540
import java.util.concurrent.TimeUnit;
3641
import java.util.logging.Level;
3742

38-
import static org.junit.Assert.*;
43+
import static org.junit.Assert.assertEquals;
44+
import static org.junit.Assert.assertFalse;
45+
import static org.junit.Assert.assertNotNull;
46+
import static org.junit.Assert.assertNull;
47+
import static org.junit.Assert.assertTrue;
48+
import static org.junit.Assert.fail;
3949

4050
/**
4151
* QFJ-643: Unable to restart a stopped acceptor (SocketAcceptor)
@@ -190,6 +200,53 @@ public void testSessionsAreCleanedUpOnThreadedSocketAcceptor() throws Exception
190200
}
191201
}
192202

203+
@Test
204+
public void testAcceptorContinueInitializationOnError() throws ConfigError, InterruptedException, IOException {
205+
final int port = AvailablePortFinder.getNextAvailable();
206+
final int port2 = AvailablePortFinder.getNextAvailable();
207+
final SessionSettings settings = new SessionSettings();
208+
final SessionID sessionId = new SessionID("FIX.4.4", "SENDER", "TARGET");
209+
final SessionID sessionId2 = new SessionID("FIX.4.4", "FOO", "BAR");
210+
final SessionID sessionId3 = new SessionID("FIX.4.4", "BAR", "BAZ");
211+
settings.setString(SessionFactory.SETTING_CONTINUE_INIT_ON_ERROR, "Y");
212+
settings.setString("ConnectionType", "acceptor");
213+
settings.setString("StartTime", "00:00:00");
214+
settings.setString("EndTime", "00:00:00");
215+
settings.setString("HeartBtInt", "30");
216+
settings.setString("BeginString", "FIX.4.4");
217+
settings.setLong(sessionId, "SocketAcceptPort", port);
218+
settings.setLong(sessionId2, "SocketAcceptPort", port2);
219+
settings.setLong(sessionId3, "SocketAcceptPort", port2);
220+
settings.setString(sessionId, SSLSupport.SETTING_USE_SSL, "Y");
221+
settings.setString(sessionId, SSLSupport.SETTING_KEY_STORE_NAME, "test.keystore");
222+
// supply a wrong password to make initialization fail
223+
settings.setString(sessionId, SSLSupport.SETTING_KEY_STORE_PWD, "wrong-password");
224+
// supply a wrong protocol to make initialization fail
225+
settings.setString(sessionId3, "SocketAcceptProtocol", "foobar");
226+
227+
final SocketAcceptor acceptor = new SocketAcceptor(new ApplicationAdapter(), new MemoryStoreFactory(), settings,
228+
new ScreenLogFactory(settings), new DefaultMessageFactory());
229+
acceptor.start();
230+
231+
for (IoAcceptor endpoint : acceptor.getEndpoints()) {
232+
boolean containsFIXCodec = endpoint.getFilterChain().contains(FIXProtocolCodecFactory.FILTER_NAME);
233+
if (endpoint.getLocalAddress() == null) { // failing session is not bound!
234+
assertFalse(containsFIXCodec);
235+
} else {
236+
assertTrue(containsFIXCodec);
237+
}
238+
}
239+
240+
// sessionid1 is present since it fails after the setup phase
241+
assertTrue(acceptor.getSessions().contains(sessionId));
242+
// sessionid2 is set up normally
243+
assertTrue(acceptor.getSessions().contains(sessionId2));
244+
// sessionid3 could not be set up due to problems in the config itself
245+
assertFalse(acceptor.getSessions().contains(sessionId3));
246+
247+
acceptor.stop();
248+
}
249+
193250
private void checkThreads(ThreadMXBean bean, int expectedNum) {
194251
ThreadInfo[] dumpAllThreads = bean.dumpAllThreads(false, false);
195252
int qfjMPThreads = 0;

quickfixj-core/src/test/java/quickfix/SocketInitiatorTest.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@
5454
import static org.junit.Assert.assertFalse;
5555
import static org.junit.Assert.assertTrue;
5656
import quickfix.field.MsgType;
57+
import quickfix.mina.ssl.SSLSupport;
5758
import quickfix.test.util.ReflectionUtil;
5859

5960
public class SocketInitiatorTest {
@@ -405,6 +406,36 @@ public void onResendRequestSatisfied(int beginSeqNo, int endSeqNo) {
405406
assertEquals(1, onDisconnectCallCount.intValue());
406407
}
407408

409+
410+
@Test
411+
public void testInitiatorContinueInitializationOnError() throws ConfigError, InterruptedException, IOException {
412+
final ServerSocket serverSocket = new ServerSocket(0);
413+
final int port = serverSocket.getLocalPort();
414+
final SessionSettings settings = new SessionSettings();
415+
final SessionID sessionId = new SessionID("FIX.4.4", "SENDER", "TARGET");
416+
settings.setString(SessionFactory.SETTING_CONTINUE_INIT_ON_ERROR, "Y");
417+
settings.setString(sessionId, "BeginString", "FIX.4.4");
418+
settings.setString("ConnectionType", "initiator");
419+
settings.setLong(sessionId, "SocketConnectPort", port);
420+
settings.setString(sessionId, "SocketConnectHost", "localhost");
421+
settings.setString("StartTime", "00:00:00");
422+
settings.setString("EndTime", "00:00:00");
423+
settings.setString("HeartBtInt", "30");
424+
settings.setString("SocketConnectProtocol", ProtocolFactory.getTypeString(ProtocolFactory.SOCKET));
425+
settings.setString(sessionId, SSLSupport.SETTING_USE_SSL, "Y");
426+
settings.setString(sessionId, SSLSupport.SETTING_KEY_STORE_NAME, "test.keystore");
427+
// supply a wrong password to make initialization fail
428+
settings.setString(sessionId, SSLSupport.SETTING_KEY_STORE_PWD, "wrong-password");
429+
430+
final SocketInitiator initiator = new SocketInitiator(new ApplicationAdapter(), new MemoryStoreFactory(), settings,
431+
new ScreenLogFactory(settings), new DefaultMessageFactory());
432+
initiator.start();
433+
434+
assertTrue(initiator.getInitiators().isEmpty());
435+
initiator.stop();
436+
}
437+
438+
408439
private void doTestOfRestart(SessionID clientSessionID, ClientApplication clientApplication,
409440
final Initiator initiator, File messageLog, int port) throws InterruptedException, ConfigError {
410441
ServerThread serverThread = new ServerThread(port);

0 commit comments

Comments
 (0)