1818
1919import java .util .Map ;
2020import java .util .concurrent .ConcurrentHashMap ;
21- import java .util .concurrent .Executors ;
22- import java .util .concurrent .ScheduledExecutorService ;
23- import java .util .concurrent .ScheduledFuture ;
24- import java .util .concurrent .ThreadFactory ;
2521import java .util .concurrent .TimeUnit ;
2622
2723import static com .mongodb .ServerConnectionState .Connecting ;
28- import static java .util .concurrent .TimeUnit .MILLISECONDS ;
2924import static org .bson .util .Assertions .isTrue ;
3025import static org .bson .util .Assertions .notNull ;
3126
3227class DefaultServer implements ClusterableServer {
33-
34- private enum HeartbeatFrequency {
35- NORMAL {
36- @ Override
37- long getFrequencyMS (final ServerSettings settings ) {
38- return settings .getHeartbeatFrequency (MILLISECONDS );
39- }
40- },
41-
42- RETRY {
43- @ Override
44- long getFrequencyMS (final ServerSettings settings ) {
45- return settings .getHeartbeatConnectRetryFrequency (MILLISECONDS );
46- }
47- };
48-
49- abstract long getFrequencyMS (final ServerSettings settings );
50- }
51-
52- private final String clusterId ;
53- private final ScheduledExecutorService scheduledExecutorService ;
5428 private final ServerAddress serverAddress ;
55- private final ServerStateNotifier stateNotifier ;
29+ private final ServerMonitor serverMonitor ;
5630 private final PooledConnectionProvider connectionProvider ;
5731 private final Map <ChangeListener <ServerDescription >, Boolean > changeListeners =
5832 new ConcurrentHashMap <ChangeListener <ServerDescription >, Boolean >();
59- private final ServerSettings settings ;
6033 private final ChangeListener <ServerDescription > serverStateListener ;
6134 private volatile ServerDescription description ;
6235 private volatile boolean isClosed ;
6336
64- private ScheduledFuture <?> scheduledFuture ;
65- private HeartbeatFrequency currentFrequency ;
66-
6737 public DefaultServer (final ServerAddress serverAddress ,
6838 final ServerSettings settings ,
6939 final String clusterId , final PooledConnectionProvider connectionProvider ,
7040 final Mongo mongo ) {
71- this .clusterId = notNull ("clusterId" , clusterId );
72- this .settings = notNull ("settings" , settings );
7341 this .serverAddress = notNull ("serverAddress" , serverAddress );
7442 this .description = ServerDescription .builder ().state (Connecting ).address (serverAddress ).build ();
7543 serverStateListener = new DefaultServerStateListener ();
76- this .stateNotifier = new ServerStateNotifier (serverAddress , serverStateListener ,
77- settings .getHeartbeatSocketSettings (), mongo );
78- this .scheduledExecutorService = Executors .newSingleThreadScheduledExecutor (new DefaultThreadFactory ());
79- setHeartbeat (0 , HeartbeatFrequency .NORMAL );
44+ this .serverMonitor = new ServerMonitor (serverAddress , serverStateListener ,
45+ settings .getHeartbeatSocketSettings (), settings , clusterId , mongo );
46+ this .serverMonitor .start ();
8047 this .connectionProvider = connectionProvider ;
8148 }
8249
@@ -107,16 +74,13 @@ public void invalidate() {
10774 serverStateListener .stateChanged (new ChangeEvent <ServerDescription >(description , ServerDescription .builder ()
10875 .state (Connecting )
10976 .address (serverAddress ).build ()));
110- setHeartbeat (0 , HeartbeatFrequency .RETRY );
11177 connectionProvider .invalidate ();
11278 }
11379
11480 @ Override
11581 public void close () {
11682 if (!isClosed ()) {
117- scheduledFuture .cancel (true );
118- scheduledExecutorService .shutdownNow ();
119- stateNotifier .close ();
83+ serverMonitor .close ();
12084 connectionProvider .close ();
12185 isClosed = true ;
12286 }
@@ -127,34 +91,9 @@ public boolean isClosed() {
12791 return isClosed ;
12892 }
12993
130- private void setHeartbeat (final ChangeEvent <ServerDescription > event ) {
131- HeartbeatFrequency heartbeatFrequency = event .getNewValue ().getState () == Connecting
132- ? HeartbeatFrequency .RETRY
133- : HeartbeatFrequency .NORMAL ;
134- long initialDelay = heartbeatFrequency .getFrequencyMS (settings );
135- setHeartbeat (initialDelay , heartbeatFrequency );
136- }
137-
138- private synchronized void setHeartbeat (final long initialDelay , final HeartbeatFrequency newFrequency ) {
139- if (currentFrequency != newFrequency ) {
140- currentFrequency = newFrequency ;
141- if (scheduledFuture != null ) {
142- scheduledFuture .cancel (false );
143- }
144- scheduledFuture = scheduledExecutorService .scheduleAtFixedRate (stateNotifier , initialDelay ,
145- newFrequency .getFrequencyMS (settings ),
146- MILLISECONDS );
147- }
148- }
149-
150- // Custom thread factory for scheduled executor service that creates daemon threads. Otherwise,
151- // applications that neglect to close the MongoClient will not exit.
152- class DefaultThreadFactory implements ThreadFactory {
153- public Thread newThread (final Runnable runnable ) {
154- Thread t = new Thread (runnable , "cluster-" + clusterId + "-" + serverAddress );
155- t .setDaemon (true );
156- return t ;
157- }
94+ @ Override
95+ public void connect () {
96+ serverMonitor .connect ();
15897 }
15998
16099 private final class DefaultServerStateListener implements ChangeListener <ServerDescription > {
@@ -164,7 +103,6 @@ public void stateChanged(final ChangeEvent<ServerDescription> event) {
164103 for (ChangeListener <ServerDescription > listener : changeListeners .keySet ()) {
165104 listener .stateChanged (event );
166105 }
167- setHeartbeat (event );
168106 }
169107 }
170108}
0 commit comments