1818 */
1919package org .neo4j .driver .internal .cluster ;
2020
21- import java .util .HashSet ;
21+ import java .util .Set ;
2222
2323import org .neo4j .driver .internal .RoutingErrorHandler ;
2424import org .neo4j .driver .internal .net .BoltServerAddress ;
2828import org .neo4j .driver .v1 .Logger ;
2929import org .neo4j .driver .v1 .exceptions .ServiceUnavailableException ;
3030
31- import static java .util . Arrays . asList ;
31+ import static java .lang . String . format ;
3232
3333public final class LoadBalancer implements RoutingErrorHandler , AutoCloseable
3434{
35- private static final int MIN_ROUTERS = 1 ;
36- private static final String NO_ROUTERS_AVAILABLE = "Could not perform discovery. No routing servers available." ;
37- // dependencies
38- private final RoutingSettings settings ;
39- private final Clock clock ;
4035 private final Logger log ;
36+
4137 private final ConnectionPool connections ;
42- private final ClusterComposition .Provider provider ;
43- // state
44- private long expirationTimeout ;
45- private final RoundRobinAddressSet readers , writers , routers ;
38+ private final RoutingTable routingTable ;
39+ private final Rediscovery rediscovery ;
4640
4741 public LoadBalancer (
4842 RoutingSettings settings ,
@@ -51,40 +45,41 @@ public LoadBalancer(
5145 ConnectionPool connections ,
5246 BoltServerAddress ... routingAddresses ) throws ServiceUnavailableException
5347 {
54- this ( settings , clock , log , connections , new ClusterComposition . Provider . Default ( clock , log ),
55- routingAddresses );
48+ this ( settings , clock , log , connections , new ClusterRoutingTable ( clock , routingAddresses ),
49+ new ClusterComposition . Provider . Default ( clock , log ) );
5650 }
5751
58- LoadBalancer (
52+ private LoadBalancer (
5953 RoutingSettings settings ,
6054 Clock clock ,
6155 Logger log ,
6256 ConnectionPool connections ,
63- ClusterComposition .Provider provider ,
64- BoltServerAddress ... routingAddresses ) throws ServiceUnavailableException
57+ RoutingTable routingTable ,
58+ ClusterComposition .Provider provider ) throws ServiceUnavailableException
59+ {
60+ this ( log , connections , routingTable , new Rediscovery ( settings , clock , log , provider ) );
61+ }
62+
63+ LoadBalancer ( Logger log , ConnectionPool connections , RoutingTable routingTable , Rediscovery rediscovery )
64+ throws ServiceUnavailableException
6565 {
66- this .clock = clock ;
6766 this .log = log ;
6867 this .connections = connections ;
69- this .expirationTimeout = clock .millis () - 1 ;
70- this .provider = provider ;
71- this .settings = settings ;
72- this .readers = new RoundRobinAddressSet ();
73- this .writers = new RoundRobinAddressSet ();
74- this .routers = new RoundRobinAddressSet ();
75- routers .update ( new HashSet <>( asList ( routingAddresses ) ), new HashSet <BoltServerAddress >() );
68+ this .routingTable = routingTable ;
69+ this .rediscovery = rediscovery ;
70+
7671 // initialize the routing table
7772 ensureRouting ();
7873 }
7974
8075 public Connection acquireReadConnection () throws ServiceUnavailableException
8176 {
82- return acquireConnection ( readers );
77+ return acquireConnection ( routingTable . readers () );
8378 }
8479
8580 public Connection acquireWriteConnection () throws ServiceUnavailableException
8681 {
87- return acquireConnection ( writers );
82+ return acquireConnection ( routingTable . writers () );
8883 }
8984
9085 @ Override
@@ -96,7 +91,7 @@ public void onConnectionFailure( BoltServerAddress address )
9691 @ Override
9792 public void onWriteFailure ( BoltServerAddress address )
9893 {
99- writers . remove ( address );
94+ routingTable . removeWriter ( address );
10095 }
10196
10297 @ Override
@@ -119,7 +114,7 @@ private Connection acquireConnection( RoundRobinAddressSet servers ) throws Serv
119114 }
120115 catch ( ServiceUnavailableException e )
121116 {
122- log .error ( String . format ( "Failed to refresh routing information using routing address %s" ,
117+ log .error ( format ( "Failed to refresh routing information using routing address %s" ,
123118 address ), e );
124119
125120 forget ( address );
@@ -129,29 +124,31 @@ private Connection acquireConnection( RoundRobinAddressSet servers ) throws Serv
129124 }
130125 }
131126
127+ private synchronized void forget ( BoltServerAddress address )
128+ {
129+ // First remove from the load balancer, to prevent concurrent threads from making connections to them.
130+ routingTable .forget ( address );
131+ // drop all current connections to the address
132+ connections .purge ( address );
133+ }
134+
132135 private synchronized void ensureRouting () throws ServiceUnavailableException
133136 {
134- if ( stale () )
137+ if ( routingTable . isStale () )
135138 {
136- log .info ( "Routing information is stale. Ttl %s, currentTime %s, routers %s, writers %s, readers %s" ,
137- expirationTimeout , clock .millis (), routers , writers , readers );
139+ log .info ( "Routing information is stale. %s" , routingTable );
138140 try
139141 {
140142 // get a new routing table
141- ClusterComposition cluster = lookupRoutingTable ();
142- expirationTimeout = cluster .expirationTimestamp ;
143- HashSet <BoltServerAddress > removed = new HashSet <>();
144- readers .update ( cluster .readers (), removed );
145- writers .update ( cluster .writers (), removed );
146- routers .update ( cluster .routers (), removed );
143+ ClusterComposition cluster = rediscovery .lookupRoutingTable ( connections , routingTable );
144+ Set <BoltServerAddress > removed = routingTable .update ( cluster );
147145 // purge connections to removed addresses
148146 for ( BoltServerAddress address : removed )
149147 {
150148 connections .purge ( address );
151149 }
152150
153- log .info ( "Refreshed routing information. Ttl %s, routers %s, writers %s, readers %s" ,
154- expirationTimeout , routers , writers , readers );
151+ log .info ( "Refreshed routing information. %s" , routingTable );
155152 }
156153 catch ( InterruptedException e )
157154 {
@@ -160,79 +157,5 @@ private synchronized void ensureRouting() throws ServiceUnavailableException
160157 }
161158 }
162159
163- private ClusterComposition lookupRoutingTable () throws InterruptedException , ServiceUnavailableException
164- {
165- int size = routers .size (), failures = 0 ;
166- if ( size == 0 )
167- {
168- throw new ServiceUnavailableException ( NO_ROUTERS_AVAILABLE );
169- }
170- for ( long start = clock .millis (), delay = 0 ; ; delay = Math .max ( settings .retryTimeoutDelay , delay * 2 ) )
171- {
172- long waitTime = start + delay - clock .millis ();
173- if ( waitTime > 0 )
174- {
175- clock .sleep ( waitTime );
176- }
177- start = clock .millis ();
178- for ( int i = 0 ; i < size ; i ++ )
179- {
180- BoltServerAddress address = routers .next ();
181- if ( address == null )
182- {
183- throw new ServiceUnavailableException ( NO_ROUTERS_AVAILABLE );
184- }
185- ClusterComposition cluster ;
186- try ( Connection connection = connections .acquire ( address ) )
187- {
188- cluster = provider .getClusterComposition ( connection );
189- log .info ( "Got cluster composition %s" , cluster );
190- }
191- catch ( Exception e )
192- {
193- log .error ( String .format ( "Failed to connect to routing server '%s'." , address ), e );
194- continue ;
195- }
196- if ( cluster == null || !cluster .isValid () )
197- {
198- log .info (
199- "Server <%s> unable to perform routing capability, dropping from list of routers." ,
200- address );
201- routers .remove ( address );
202- if ( --size == 0 )
203- {
204- throw new ServiceUnavailableException ( NO_ROUTERS_AVAILABLE );
205- }
206- }
207- else
208- {
209- return cluster ;
210- }
211- }
212- if ( ++failures >= settings .maxRoutingFailures )
213- {
214- throw new ServiceUnavailableException ( NO_ROUTERS_AVAILABLE );
215- }
216- }
217- }
218-
219- private synchronized void forget ( BoltServerAddress address )
220- {
221- // First remove from the load balancer, to prevent concurrent threads from making connections to them.
222- // Don't remove it from the set of routers, since that might mean we lose our ability to re-discover,
223- // just remove it from the set of readers and writers, so that we don't use it for actual work without
224- // performing discovery first.
225- readers .remove ( address );
226- writers .remove ( address );
227- // drop all current connections to the address
228- connections .purge ( address );
229- }
230160
231- private boolean stale ()
232- {
233- return expirationTimeout < clock .millis () || // the expiration timeout has been reached
234- routers .size () <= MIN_ROUTERS || // we need to discover more routing servers
235- readers .size () == 0 || // we need to discover more read servers
236- writers .size () == 0 ; // we need to discover more write servers
237- }
238161}
0 commit comments