2020
2121import java .util .Collections ;
2222import java .util .Comparator ;
23+ import java .util .HashSet ;
24+ import java .util .List ;
2325import java .util .Set ;
26+ import java .util .concurrent .atomic .AtomicLong ;
2427
2528import org .neo4j .driver .internal .net .BoltServerAddress ;
2629import org .neo4j .driver .internal .security .SecurityPlan ;
2730import org .neo4j .driver .internal .spi .Connection ;
2831import org .neo4j .driver .internal .spi .ConnectionPool ;
32+ import org .neo4j .driver .internal .util .Clock ;
2933import org .neo4j .driver .internal .util .ConcurrentRoundRobinSet ;
3034import org .neo4j .driver .internal .util .Consumer ;
3135import org .neo4j .driver .v1 .AccessMode ;
3438import org .neo4j .driver .v1 .Record ;
3539import org .neo4j .driver .v1 .Session ;
3640import org .neo4j .driver .v1 .StatementResult ;
41+ import org .neo4j .driver .v1 .Value ;
3742import org .neo4j .driver .v1 .exceptions .ClientException ;
3843import org .neo4j .driver .v1 .exceptions .ConnectionFailureException ;
3944import org .neo4j .driver .v1 .exceptions .ServiceUnavailableException ;
4045import org .neo4j .driver .v1 .util .BiFunction ;
46+ import org .neo4j .driver .v1 .util .Function ;
4147
4248import static java .lang .String .format ;
4349
4450public class ClusterDriver extends BaseDriver
4551{
4652 private static final String GET_SERVERS = "dbms.cluster.routing.getServers" ;
53+ private static final long MAX_TTL = Long .MAX_VALUE / 1000L ;
4754 private final static Comparator <BoltServerAddress > COMPARATOR = new Comparator <BoltServerAddress >()
4855 {
4956 @ Override
5057 public int compare ( BoltServerAddress o1 , BoltServerAddress o2 )
5158 {
5259 int compare = o1 .host ().compareTo ( o2 .host () );
53- if (compare == 0 )
60+ if ( compare == 0 )
5461 {
5562 compare = Integer .compare ( o1 .port (), o2 .port () );
5663 }
5764
5865 return compare ;
5966 }
6067 };
61- private static final int MIN_SERVERS = 2 ;
68+ private static final int MIN_SERVERS = 1 ;
6269 private final ConnectionPool connections ;
63- private final BiFunction <Connection ,Logger , Session > sessionProvider ;
64-
65- private final ConcurrentRoundRobinSet <BoltServerAddress > routingServers = new ConcurrentRoundRobinSet <>(COMPARATOR );
66- private final ConcurrentRoundRobinSet <BoltServerAddress > readServers = new ConcurrentRoundRobinSet <>(COMPARATOR );
67- private final ConcurrentRoundRobinSet <BoltServerAddress > writeServers = new ConcurrentRoundRobinSet <>(COMPARATOR );
70+ private final BiFunction <Connection ,Logger ,Session > sessionProvider ;
71+ private final Clock clock ;
72+ private final ConcurrentRoundRobinSet <BoltServerAddress > routingServers =
73+ new ConcurrentRoundRobinSet <>( COMPARATOR );
74+ private final ConcurrentRoundRobinSet <BoltServerAddress > readServers = new ConcurrentRoundRobinSet <>( COMPARATOR );
75+ private final ConcurrentRoundRobinSet <BoltServerAddress > writeServers = new ConcurrentRoundRobinSet <>( COMPARATOR );
76+ private final AtomicLong expires = new AtomicLong ( 0L );
6877
6978 public ClusterDriver ( BoltServerAddress seedAddress ,
7079 ConnectionPool connections ,
7180 SecurityPlan securityPlan ,
72- BiFunction <Connection ,Logger , Session > sessionProvider ,
81+ BiFunction <Connection ,Logger ,Session > sessionProvider ,
82+ Clock clock ,
7383 Logging logging )
7484 {
7585 super ( securityPlan , logging );
7686 routingServers .add ( seedAddress );
7787 this .connections = connections ;
7888 this .sessionProvider = sessionProvider ;
89+ this .clock = clock ;
7990 checkServers ();
8091 }
8192
8293 private void checkServers ()
8394 {
8495 synchronized ( routingServers )
8596 {
86- if ( routingServers .size () < MIN_SERVERS ||
97+ if ( expires .get () < clock .millis () ||
98+ routingServers .size () < MIN_SERVERS ||
8799 readServers .isEmpty () ||
88- writeServers .isEmpty ())
100+ writeServers .isEmpty () )
89101 {
90102 getServers ();
91103 }
92104 }
93105 }
94106
107+ private Set <BoltServerAddress > forgetAllServers ()
108+ {
109+ final Set <BoltServerAddress > seen = new HashSet <>();
110+ seen .addAll ( routingServers );
111+ seen .addAll ( readServers );
112+ seen .addAll ( writeServers );
113+ routingServers .clear ();
114+ readServers .clear ();
115+ writeServers .clear ();
116+ return seen ;
117+ }
118+
119+ private long calculateNewExpiry ( Record record )
120+ {
121+ long ttl = record .get ( "ttl" ).asLong ();
122+ long nextExpiry = clock .millis () + 1000L * ttl ;
123+ if ( ttl < 0 || ttl >= MAX_TTL || nextExpiry < 0 )
124+ {
125+ return Long .MAX_VALUE ;
126+ }
127+ else
128+ {
129+ return nextExpiry ;
130+ }
131+ }
132+
95133 //must be called from a synchronized block
96134 private void getServers ()
97135 {
98136 BoltServerAddress address = null ;
99137 try
100138 {
101139 boolean success = false ;
102- while ( !routingServers .isEmpty () && !success )
140+
141+ ConcurrentRoundRobinSet <BoltServerAddress > routers = new ConcurrentRoundRobinSet <>( routingServers );
142+ final Set <BoltServerAddress > seen = forgetAllServers ();
143+ while ( !routers .isEmpty () && !success )
103144 {
104- address = routingServers .hop ();
145+ address = routers .hop ();
105146 success = call ( address , GET_SERVERS , new Consumer <Record >()
106147 {
107148 @ Override
108149 public void accept ( Record record )
109150 {
110- BoltServerAddress newAddress = new BoltServerAddress ( record .get ( "address" ).asString () );
111- switch ( record .get ( "mode" ).asString ().toUpperCase () )
151+ expires .set ( calculateNewExpiry ( record ) );
152+ List <ServerInfo > servers = servers ( record );
153+ for ( ServerInfo server : servers )
112154 {
113- case "READ" :
114- readServers .add ( newAddress );
115- break ;
116- case "WRITE" :
117- writeServers .add ( newAddress );
118- break ;
119- case "ROUTE" :
120- routingServers .add ( newAddress );
121- break ;
155+ seen .removeAll ( server .addresses () );
156+ switch ( server .role () )
157+ {
158+ case "READ" :
159+ readServers .addAll ( server .addresses () );
160+ break ;
161+ case "WRITE" :
162+ writeServers .addAll ( server .addresses () );
163+ break ;
164+ case "ROUTE" :
165+ routingServers .addAll ( server .addresses () );
166+ break ;
167+ }
122168 }
123169 }
124170 } );
@@ -127,6 +173,12 @@ public void accept( Record record )
127173 {
128174 throw new ServiceUnavailableException ( "Run out of servers" );
129175 }
176+
177+ //the server no longer think we should care about these
178+ for ( BoltServerAddress remove : seen )
179+ {
180+ connections .purge ( remove );
181+ }
130182 }
131183 catch ( ClientException ex )
132184 {
@@ -137,7 +189,7 @@ public void accept( Record record )
137189 this .close ();
138190 throw new ServiceUnavailableException (
139191 String .format ( "Server %s couldn't perform discovery" ,
140- address == null ? "`UNKNOWN`" : address .toString ()), ex );
192+ address == null ? "`UNKNOWN`" : address .toString () ), ex );
141193 }
142194 else
143195 {
@@ -146,14 +198,55 @@ public void accept( Record record )
146198 }
147199 }
148200
201+ private static class ServerInfo
202+ {
203+ private final List <BoltServerAddress > addresses ;
204+ private final String role ;
205+
206+ public ServerInfo ( List <BoltServerAddress > addresses , String role )
207+ {
208+ this .addresses = addresses ;
209+ this .role = role ;
210+ }
211+
212+ public String role ()
213+ {
214+ return role ;
215+ }
216+
217+ List <BoltServerAddress > addresses ()
218+ {
219+ return addresses ;
220+ }
221+ }
222+
223+ private List <ServerInfo > servers ( Record record )
224+ {
225+ return record .get ( "servers" ).asList ( new Function <Value ,ServerInfo >()
226+ {
227+ @ Override
228+ public ServerInfo apply ( Value value )
229+ {
230+ return new ServerInfo ( value .get ( "addresses" ).asList ( new Function <Value ,BoltServerAddress >()
231+ {
232+ @ Override
233+ public BoltServerAddress apply ( Value value )
234+ {
235+ return new BoltServerAddress ( value .asString () );
236+ }
237+ } ), value .get ( "role" ).asString () );
238+ }
239+ } );
240+ }
241+
149242 //must be called from a synchronized method
150243 private boolean call ( BoltServerAddress address , String procedureName , Consumer <Record > recorder )
151244 {
152245 Connection acquire = null ;
153246 Session session = null ;
154247 try
155248 {
156- acquire = connections .acquire (address );
249+ acquire = connections .acquire ( address );
157250 session = sessionProvider .apply ( acquire , log );
158251
159252 StatementResult records = session .run ( format ( "CALL %s" , procedureName ) );
@@ -217,19 +310,19 @@ public void onWriteFailure( BoltServerAddress address )
217310 log );
218311 }
219312
220- private Connection acquireConnection ( AccessMode mode )
313+ private Connection acquireConnection ( AccessMode role )
221314 {
222315 //Potentially rediscover servers if we are not happy with our current knowledge
223316 checkServers ();
224317
225- switch ( mode )
318+ switch ( role )
226319 {
227320 case READ :
228321 return connections .acquire ( readServers .hop () );
229322 case WRITE :
230323 return connections .acquire ( writeServers .hop () );
231324 default :
232- throw new ClientException ( mode + " is not supported for creating new sessions" );
325+ throw new ClientException ( role + " is not supported for creating new sessions" );
233326 }
234327 }
235328
@@ -255,13 +348,13 @@ Set<BoltServerAddress> routingServers()
255348 //For testing
256349 Set <BoltServerAddress > readServers ()
257350 {
258- return Collections .unmodifiableSet (readServers );
351+ return Collections .unmodifiableSet ( readServers );
259352 }
260353
261354 //For testing
262355 Set <BoltServerAddress > writeServers ()
263356 {
264- return Collections .unmodifiableSet ( writeServers );
357+ return Collections .unmodifiableSet ( writeServers );
265358 }
266359
267360 //For testing
0 commit comments