Skip to content

Commit d6e96de

Browse files
authored
Minor cleanup in Connectors (#199)
* Remove unneeded methods. * Cleanup sessions in any case on stop(). ** eventHandlingStrategy.stopHandlingMessages() might throw RuntimeException from onMessage() ** minor change: aligned initialize method to be in same position as in SocketAcceptor * Move isStarted flag to end of initialization. ** eventHandlingStrategy.blockInThread() returns immediately (as opposed to block() which could formerly be used) * Remove ConfigError on stopAcceptingConnections(). ** that Exception is never thrown * Minor reformatting. * Introduce default stop() method.
1 parent f77c26d commit d6e96de

File tree

7 files changed

+112
-136
lines changed

7 files changed

+112
-136
lines changed

quickfixj-core/src/main/java/quickfix/Connector.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,9 @@ public interface Connector {
4141
* connections.
4242
* This method must not be called by several threads concurrently.
4343
*/
44-
void stop();
44+
default void stop() {
45+
stop(false);
46+
}
4547

4648
/**
4749
* Stops all sessions, optionally waiting for logout completion.

quickfixj-core/src/main/java/quickfix/SocketAcceptor.java

Lines changed: 10 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -106,34 +106,28 @@ private void initialize() throws ConfigError {
106106
if (isStarted.equals(Boolean.FALSE)) {
107107
eventHandlingStrategy.setExecutor(longLivedExecutor);
108108
startAcceptingConnections();
109-
isStarted = Boolean.TRUE;
110109
eventHandlingStrategy.blockInThread();
110+
isStarted = Boolean.TRUE;
111111
} else {
112112
log.warn("Ignored attempt to start already running SocketAcceptor.");
113113
}
114114
}
115115

116-
@Override
117-
public void stop() {
118-
stop(false);
119-
}
120-
121116
@Override
122117
public void stop(boolean forceDisconnect) {
123118
if (isStarted.equals(Boolean.TRUE)) {
124119
try {
125-
try {
126-
logoutAllSessions(forceDisconnect);
127-
stopAcceptingConnections();
128-
} catch (ConfigError e) {
129-
log.error("Error when stopping acceptor.", e);
130-
}
120+
logoutAllSessions(forceDisconnect);
121+
stopAcceptingConnections();
131122
stopSessionTimer();
132123
} finally {
133-
eventHandlingStrategy.stopHandlingMessages();
134-
Session.unregisterSessions(getSessions(), true);
135-
clearConnectorSessions();
136-
isStarted = Boolean.FALSE;
124+
try {
125+
eventHandlingStrategy.stopHandlingMessages();
126+
} finally {
127+
Session.unregisterSessions(getSessions(), true);
128+
clearConnectorSessions();
129+
isStarted = Boolean.FALSE;
130+
}
137131
}
138132
}
139133
}

quickfixj-core/src/main/java/quickfix/SocketInitiator.java

Lines changed: 20 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -110,27 +110,7 @@ public SocketInitiator(SessionFactory sessionFactory, SessionSettings settings,
110110
public void start() throws ConfigError, RuntimeError {
111111
initialize();
112112
}
113-
114-
@Override
115-
public void stop() {
116-
stop(false);
117-
}
118-
119-
@Override
120-
public void stop(boolean forceDisconnect) {
121-
if (isStarted.equals(Boolean.TRUE)) {
122-
try {
123-
logoutAllSessions(forceDisconnect);
124-
stopInitiators();
125-
} finally {
126-
eventHandlingStrategy.stopHandlingMessages();
127-
Session.unregisterSessions(getSessions(), true);
128-
clearConnectorSessions();
129-
isStarted = Boolean.FALSE;
130-
}
131-
}
132-
}
133-
113+
134114
private void initialize() throws ConfigError {
135115
if (isStarted.equals(Boolean.FALSE)) {
136116
eventHandlingStrategy.setExecutor(longLivedExecutor);
@@ -139,13 +119,31 @@ private void initialize() throws ConfigError {
139119
Session.registerSession(session);
140120
}
141121
startInitiators();
142-
isStarted = Boolean.TRUE;
143122
eventHandlingStrategy.blockInThread();
123+
isStarted = Boolean.TRUE;
144124
} else {
145125
log.warn("Ignored attempt to start already running SocketInitiator.");
146126
}
147127
}
148128

129+
@Override
130+
public void stop(boolean forceDisconnect) {
131+
if (isStarted.equals(Boolean.TRUE)) {
132+
try {
133+
logoutAllSessions(forceDisconnect);
134+
stopInitiators();
135+
} finally {
136+
try {
137+
eventHandlingStrategy.stopHandlingMessages();
138+
} finally {
139+
Session.unregisterSessions(getSessions(), true);
140+
clearConnectorSessions();
141+
isStarted = Boolean.FALSE;
142+
}
143+
}
144+
}
145+
}
146+
149147
@Override
150148
protected EventHandlingStrategy getEventHandlingStrategy() {
151149
return eventHandlingStrategy;

quickfixj-core/src/main/java/quickfix/ThreadedSocketAcceptor.java

Lines changed: 4 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -97,32 +97,22 @@ public ThreadedSocketAcceptor(SessionFactory sessionFactory, SessionSettings set
9797
eventHandlingStrategy = new ThreadPerSessionEventHandlingStrategy(this, DEFAULT_QUEUE_CAPACITY);
9898
}
9999

100+
@Override
100101
public void start() throws ConfigError, RuntimeError {
101102
eventHandlingStrategy.setExecutor(longLivedExecutor);
102103
startAcceptingConnections();
103104
}
104105

105-
public void stop() {
106-
stop(false);
107-
}
108-
106+
@Override
109107
public void stop(boolean forceDisconnect) {
110-
try {
111-
logoutAllSessions(forceDisconnect);
112-
stopAcceptingConnections();
113-
} catch (ConfigError e) {
114-
log.error("Error when stopping acceptor.", e);
115-
}
108+
logoutAllSessions(forceDisconnect);
109+
stopAcceptingConnections();
116110
stopSessionTimer();
117111
eventHandlingStrategy.stopDispatcherThreads();
118112
Session.unregisterSessions(getSessions(), true);
119113
clearConnectorSessions();
120114
}
121115

122-
public void block() throws ConfigError, RuntimeError {
123-
throw new UnsupportedOperationException("Blocking not supported: " + getClass());
124-
}
125-
126116
@Override
127117
protected EventHandlingStrategy getEventHandlingStrategy() {
128118
return eventHandlingStrategy;

quickfixj-core/src/main/java/quickfix/ThreadedSocketInitiator.java

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -99,16 +99,14 @@ public ThreadedSocketInitiator(SessionFactory sessionFactory, SessionSettings se
9999
eventHandlingStrategy = new ThreadPerSessionEventHandlingStrategy(this, DEFAULT_QUEUE_CAPACITY);
100100
}
101101

102+
@Override
102103
public void start() throws ConfigError, RuntimeError {
103104
eventHandlingStrategy.setExecutor(longLivedExecutor);
104105
createSessionInitiators();
105106
startInitiators();
106107
}
107108

108-
public void stop() {
109-
stop(false);
110-
}
111-
109+
@Override
112110
public void stop(boolean forceDisconnect) {
113111
logoutAllSessions(forceDisconnect);
114112
stopInitiators();
@@ -117,10 +115,6 @@ public void stop(boolean forceDisconnect) {
117115
clearConnectorSessions();
118116
}
119117

120-
public void block() throws ConfigError, RuntimeError {
121-
throw new UnsupportedOperationException("Blocking not supported: " + getClass());
122-
}
123-
124118
@Override
125119
protected EventHandlingStrategy getEventHandlingStrategy() {
126120
return eventHandlingStrategy;

quickfixj-core/src/main/java/quickfix/mina/SessionConnector.java

Lines changed: 72 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -86,27 +86,27 @@ public SessionConnector(SessionSettings settings, SessionFactory sessionFactory)
8686
}
8787
}
8888

89-
/**
90-
* <p>
91-
* Supplies the Executors to be used for all message processing and timer activities. This will override the default
92-
* behavior which uses internally created Threads. This enables scenarios such as a ResourceAdapter to supply the
93-
* WorkManager (when adapted to the Executor API) so that all Application call-backs occur on container managed
94-
* threads.
95-
* </p>
96-
* <p>
97-
* If using external Executors, this method should be called immediately after the constructor. Once set, the
98-
* Executors cannot be changed.
99-
* </p>
100-
*
101-
* @param executorFactory See {@link ExecutorFactory} for detailed requirements.
102-
*/
103-
public void setExecutorFactory(ExecutorFactory executorFactory) {
104-
if (longLivedExecutor != null || shortLivedExecutor!=null) {
105-
throw new IllegalStateException("Optional ExecutorFactory has already been set. It cannot be changed once set.");
106-
}
107-
longLivedExecutor = executorFactory.getLongLivedExecutor();
108-
shortLivedExecutor = executorFactory.getShortLivedExecutor();
109-
}
89+
/**
90+
* <p>
91+
* Supplies the Executors to be used for all message processing and timer activities. This will override the default
92+
* behavior which uses internally created Threads. This enables scenarios such as a ResourceAdapter to supply the
93+
* WorkManager (when adapted to the Executor API) so that all Application call-backs occur on container managed
94+
* threads.
95+
* </p>
96+
* <p>
97+
* If using external Executors, this method should be called immediately after the constructor. Once set, the
98+
* Executors cannot be changed.
99+
* </p>
100+
*
101+
* @param executorFactory See {@link ExecutorFactory} for detailed requirements.
102+
*/
103+
public void setExecutorFactory(ExecutorFactory executorFactory) {
104+
if (longLivedExecutor != null || shortLivedExecutor != null) {
105+
throw new IllegalStateException("Optional ExecutorFactory has already been set. It cannot be changed once set.");
106+
}
107+
longLivedExecutor = executorFactory.getLongLivedExecutor();
108+
shortLivedExecutor = executorFactory.getShortLivedExecutor();
109+
}
110110

111111
public void addPropertyChangeListener(PropertyChangeListener listener) {
112112
propertyChangeSupport.addPropertyChangeListener(listener);
@@ -312,11 +312,11 @@ private String getLogSuffix(SessionID sessionID, IoSession protocolSession) {
312312
}
313313

314314
protected void startSessionTimer() {
315-
Runnable timerTask = new SessionTimerTask();
316-
if (shortLivedExecutor != null) {
317-
timerTask = new DelegatingTask(timerTask, shortLivedExecutor);
318-
}
319-
sessionTimerFuture = scheduledExecutorService.scheduleAtFixedRate(timerTask, 0, 1000L,
315+
Runnable timerTask = new SessionTimerTask();
316+
if (shortLivedExecutor != null) {
317+
timerTask = new DelegatingTask(timerTask, shortLivedExecutor);
318+
}
319+
sessionTimerFuture = scheduledExecutorService.scheduleAtFixedRate(timerTask, 0, 1000L,
320320
TimeUnit.MILLISECONDS);
321321
log.info("SessionTimer started");
322322
}
@@ -352,54 +352,52 @@ public void run() {
352352
* Delegates QFJ Timer Task to an Executor and blocks the QFJ Timer Thread until
353353
* the Task execution completes.
354354
*/
355-
static final class DelegatingTask implements Runnable {
356-
357-
private final BlockingSupportTask delegate;
358-
private final Executor executor;
359-
360-
DelegatingTask(Runnable delegate, Executor executor) {
361-
this.delegate = new BlockingSupportTask(delegate);
362-
this.executor = executor;
363-
}
364-
365-
@Override
366-
public void run() {
367-
executor.execute(delegate);
368-
try {
369-
delegate.await();
370-
} catch (InterruptedException e) {
371-
}
372-
}
373-
374-
static final class BlockingSupportTask implements Runnable {
375-
376-
private final CountDownLatch latch = new CountDownLatch(1);
377-
private final Runnable delegate;
378-
379-
BlockingSupportTask(Runnable delegate) {
380-
this.delegate = delegate;
381-
}
382-
383-
@Override
384-
public void run() {
385-
Thread currentThread = Thread.currentThread();
386-
String threadName = currentThread.getName();
387-
try {
388-
currentThread.setName("QFJ Timer (" + threadName + ")");
389-
delegate.run();
390-
} finally {
391-
latch.countDown();
392-
currentThread.setName(threadName);
393-
}
394-
}
395-
396-
void await() throws InterruptedException {
397-
latch.await();
398-
}
399-
400-
}
401-
402-
}
355+
static final class DelegatingTask implements Runnable {
356+
357+
private final BlockingSupportTask delegate;
358+
private final Executor executor;
359+
360+
DelegatingTask(Runnable delegate, Executor executor) {
361+
this.delegate = new BlockingSupportTask(delegate);
362+
this.executor = executor;
363+
}
364+
365+
@Override
366+
public void run() {
367+
executor.execute(delegate);
368+
try {
369+
delegate.await();
370+
} catch (InterruptedException e) {
371+
}
372+
}
373+
374+
static final class BlockingSupportTask implements Runnable {
375+
376+
private final CountDownLatch latch = new CountDownLatch(1);
377+
private final Runnable delegate;
378+
379+
BlockingSupportTask(Runnable delegate) {
380+
this.delegate = delegate;
381+
}
382+
383+
@Override
384+
public void run() {
385+
Thread currentThread = Thread.currentThread();
386+
String threadName = currentThread.getName();
387+
try {
388+
currentThread.setName("QFJ Timer (" + threadName + ")");
389+
delegate.run();
390+
} finally {
391+
latch.countDown();
392+
currentThread.setName(threadName);
393+
}
394+
}
395+
396+
void await() throws InterruptedException {
397+
latch.await();
398+
}
399+
}
400+
}
403401

404402
private static class QFTimerThreadFactory implements ThreadFactory {
405403

quickfixj-core/src/main/java/quickfix/mina/acceptor/AbstractSocketAcceptor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -242,7 +242,7 @@ private void createSessions(SessionSettings settings) throws ConfigError, FieldC
242242
}
243243
}
244244

245-
protected void stopAcceptingConnections() throws ConfigError {
245+
protected void stopAcceptingConnections() {
246246
Iterator<IoAcceptor> ioIt = getEndpoints().iterator();
247247
while (ioIt.hasNext()) {
248248
IoAcceptor ioAcceptor = ioIt.next();

0 commit comments

Comments
 (0)