Skip to content

Commit 4cfaece

Browse files
ehennumehennum
authored andcommitted
connect only to primary host for load balancer scenario #999
1 parent 65221cf commit 4cfaece

23 files changed

+227
-125
lines changed

marklogic-client-api/src/main/java/com/marklogic/client/DatabaseClient.java

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,12 +99,22 @@ public interface DatabaseClient {
9999
*/
100100
XMLDocumentManager newXMLDocumentManager();
101101

102+
/**
103+
* Creates a manager for long-running asynchronous write or query jobs
104+
* with the default ConnectionPolicy.FOREST_HOSTS policy of creating
105+
* a new connection for each host that has forests for the database.
106+
* Don't forget to call dataMovementManager.release() when you're done with it.
107+
* @return a manager supporting long-running asynchronous write or query jobs
108+
*/
109+
DataMovementManager newDataMovementManager();
110+
102111
/**
103112
* Creates a manager for long-running asynchronous write or query jobs. Don't forget
104113
* to call dataMovementManager.release() when you're done with it.
114+
* @param policy how the DataMovementManager should create connections
105115
* @return a manager supporting long-running asynchronous write or query jobs
106116
*/
107-
DataMovementManager newDataMovementManager();
117+
DataMovementManager newDataMovementManager(ConnectionPolicy policy);
108118

109119
/**
110120
* Creates a manager to query for database documents.
@@ -212,6 +222,12 @@ public interface DatabaseClient {
212222
*/
213223
ServerEvaluationCall newServerEval();
214224

225+
/**
226+
* Specifies whether to connect only using the host from the initial database connection
227+
* or to create a new connection for each host that has forests for the database.
228+
*/
229+
public enum ConnectionPolicy{PRIMARY_HOST, FOREST_HOSTS}
230+
215231
String getHost();
216232

217233
int getPort();

marklogic-client-api/src/main/java/com/marklogic/client/datamovement/DataMovementManager.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -184,4 +184,11 @@ public interface DataMovementManager {
184184
* @return the latest ForestConfiguration from the server
185185
*/
186186
public ForestConfiguration readForestConfig();
187+
188+
/**
189+
* Identify the DataMovementManager uses only the primary database connection
190+
* or manages a connection for each host that has forests for the database.
191+
* @return the connection policy
192+
*/
193+
public DatabaseClient.ConnectionPolicy getConnectionPolicy();
187194
}

marklogic-client-api/src/main/java/com/marklogic/client/datamovement/FilteredForestConfiguration.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
package com.marklogic.client.datamovement;
1717

1818
import com.marklogic.client.datamovement.Forest.HostType;
19+
import com.marklogic.client.datamovement.impl.AnyForestConfiguration;
1920
import com.marklogic.client.datamovement.impl.ForestConfigurationImpl;
2021
import com.marklogic.client.datamovement.impl.ForestImpl;
2122

@@ -65,6 +66,9 @@ public class FilteredForestConfiguration implements ForestConfiguration {
6566
* @param forestConfig the ForestConfiguration to wrap
6667
*/
6768
public FilteredForestConfiguration(ForestConfiguration forestConfig) {
69+
if (forestConfig instanceof AnyForestConfiguration) {
70+
throw new IllegalArgumentException("cannot filter the forest hosts when using only the primary host");
71+
}
6872
this.wrappedForestConfig = forestConfig;
6973
}
7074

marklogic-client-api/src/main/java/com/marklogic/client/datamovement/HostAvailabilityListener.java

Lines changed: 30 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
*/
1616
package com.marklogic.client.datamovement;
1717

18+
import com.marklogic.client.DatabaseClient;
1819
import org.slf4j.Logger;
1920
import org.slf4j.LoggerFactory;
2021

@@ -38,7 +39,7 @@
3839
/** <p>HostAvailabilityListener is automatically registered with all QueryBatcher
3940
* and WriteBatcher instances to monitor for failover scenarios. When
4041
* HostAvailabilityListener detects that a host is unavailable (matches one of
41-
* {@link #getHostUnavailableExceptions()}), it black-lists the host for a
42+
* {@link #getHostUnavailableExceptions()}), it blacklists the MarkLogic host for a
4243
* period of time equal to {@link #getSuspendTimeForHostUnavailable()}. After
4344
* that time, it calls {@link DataMovementManager#readForestConfig()} then
4445
* passes that updated ForestConfiguration to batcher.withForestConfig() so the
@@ -101,6 +102,8 @@ public void processFailure(QueryBatch batch, Throwable throwable) {
101102
}
102103

103104
/**
105+
* Manages refreshing the connection hosts and retrying events after a host
106+
* becomes unavailable.
104107
* @param moveMgr the DataMovementManager (used to call readForestConfig to reset after black-listing an unavailable host)
105108
*/
106109
public HostAvailabilityListener(DataMovementManager moveMgr) {
@@ -129,15 +132,22 @@ public HostAvailabilityListener withSuspendTimeForHostUnavailable(Duration
129132
* @return this instance (for method chaining)
130133
*/
131134
public HostAvailabilityListener withMinHosts(int numHosts) {
132-
if (numHosts <= 0) throw new IllegalArgumentException("numHosts must be > 0");
133-
int numConfigHosts = moveMgr.readForestConfig().getPreferredHosts().length;
134-
if (numHosts > numConfigHosts) throw new IllegalArgumentException
135-
("numHosts must be less than or equal to the number of hosts in the cluster");
135+
if (moveMgr.getConnectionPolicy() == DatabaseClient.ConnectionPolicy.PRIMARY_HOST) {
136+
if (numHosts != 0) {
137+
throw new IllegalArgumentException("numHosts must be 1 when using only the primary host for the connection");
138+
}
139+
} else {
140+
if (numHosts <= 0) throw new IllegalArgumentException("numHosts must be > 0");
141+
int numConfigHosts = moveMgr.readForestConfig().getPreferredHosts().length;
142+
if (numHosts > numConfigHosts) throw new IllegalArgumentException
143+
("numHosts must be less than or equal to the number of hosts in the cluster");
144+
}
136145
this.minHosts = numHosts;
137146
return this;
138147
}
139148

140-
/** Overwrites the list of exceptions for which a host will be blacklisted
149+
/** Overwrites the list of exceptions for which a request can be retried and
150+
* a MarkLogic host can be blacklisted
141151
*
142152
* @param exceptionTypes the list of types of Throwable, any of which constitute a host that's unavailable
143153
*
@@ -152,7 +162,7 @@ public HostAvailabilityListener withHostUnavailableExceptions(Class<Throwable>..
152162
}
153163

154164
/**
155-
* @return the list of types of Throwable, any of which constitute a host that's unavailable
165+
* @return the list of types of Throwable, any of which constitute a MarkLogic host that's unavailable
156166
*/
157167
public Throwable[] getHostUnavailableExceptions() {
158168
return hostUnavailableExceptions.toArray(new Throwable[hostUnavailableExceptions.size()]);
@@ -217,6 +227,19 @@ private synchronized boolean processException(Batcher batcher, Throwable throwab
217227
// we only do something if this throwable is on our list of exceptions
218228
// which we consider marking a host as unavilable
219229
boolean isHostUnavailableException = isHostUnavailableException(throwable, new HashSet<>());
230+
return (moveMgr.getConnectionPolicy() == DatabaseClient.ConnectionPolicy.PRIMARY_HOST) ?
231+
processPrimaryHostException(batcher, throwable, host, isHostUnavailableException) :
232+
processForestHostException(batcher, throwable, host, isHostUnavailableException);
233+
}
234+
235+
private boolean processPrimaryHostException(Batcher batcher, Throwable throwable, String host, boolean isHostUnavailableException) {
236+
// TODO: currently, cancelling the job because the load balancer is down would require
237+
// a custom listener; determine whether the approach could be simplified with an array
238+
// of exceptions different from the exceptions that indicate the MarkLogic host is unavailable
239+
return isHostUnavailableException;
240+
}
241+
242+
private boolean processForestHostException(Batcher batcher, Throwable throwable, String host, boolean isHostUnavailableException) {
220243
boolean shouldWeRetry = isHostUnavailableException;
221244
if ( isHostUnavailableException == true ) {
222245
ForestConfiguration existingForestConfig = batcher.getForestConfig();
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
package com.marklogic.client.datamovement.impl;
2+
3+
import com.marklogic.client.DatabaseClient;
4+
import com.marklogic.client.datamovement.Forest;
5+
import com.marklogic.client.datamovement.ForestConfiguration;
6+
7+
public class AnyForestConfiguration implements ForestConfiguration {
8+
private Forest[] anyForests;
9+
private String[] primaryHosts;
10+
11+
AnyForestConfiguration(DatabaseClient client) {
12+
if (client == null) throw new IllegalArgumentException("client argument must not be null");
13+
14+
String primaryHost = client.getHost();
15+
String database = client.getDatabase();
16+
17+
this.anyForests = new Forest[]{new AnyForest(primaryHost, database,true, false)};
18+
this.primaryHosts = new String[]{primaryHost};
19+
}
20+
21+
@Override
22+
public Forest[] listForests() {
23+
return anyForests;
24+
}
25+
@Override
26+
public String[] getPreferredHosts() {
27+
return primaryHosts;
28+
}
29+
30+
static public class AnyForest extends ForestImpl {
31+
private AnyForest(String primaryHost, String databaseName, boolean isUpdateable, boolean isDeleteOnly) {
32+
super(primaryHost, primaryHost, primaryHost, primaryHost, (databaseName == null) ? "" : databaseName,
33+
"*ANY*", "", isUpdateable, isDeleteOnly);
34+
}
35+
}
36+
}

marklogic-client-api/src/main/java/com/marklogic/client/datamovement/impl/DataMovementManagerImpl.java

Lines changed: 19 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -54,12 +54,17 @@ public class DataMovementManagerImpl implements DataMovementManager {
5454
private static ConcurrentHashMap<String, JobTicket> activeJobs = new ConcurrentHashMap<>();
5555
private ForestConfiguration forestConfig;
5656
private DatabaseClient primaryClient;
57+
private DatabaseClient.ConnectionPolicy connectPolicy;
5758
// clientMap key is the hostname_database
5859
private Map<String,DatabaseClient> clientMap = new HashMap<>();
5960

60-
public DataMovementManagerImpl(DatabaseClient client) {
61+
public DataMovementManagerImpl(DatabaseClient client, DatabaseClient.ConnectionPolicy connectPolicy) {
6162
setPrimaryClient(client);
6263
clientMap.put(primaryClient.getHost(), primaryClient);
64+
this.connectPolicy = connectPolicy;
65+
if (connectPolicy == DatabaseClient.ConnectionPolicy.PRIMARY_HOST) {
66+
forestConfig = new AnyForestConfiguration(client);
67+
}
6368
}
6469

6570
@Override
@@ -137,19 +142,16 @@ public QueryBatcher newQueryBatcher(RawCombinedQueryDefinition query) {
137142

138143
private QueryBatcher newQueryBatcherImpl(QueryDefinition query) {
139144
if ( query == null ) throw new IllegalArgumentException("query must not be null");
140-
QueryBatcherImpl batcher = new QueryBatcherImpl(query, this, getForestConfig());
141-
batcher.onQueryFailure(new HostAvailabilityListener(this));
142-
QueryJobReportListener queryJobListener = new QueryJobReportListener();
143-
batcher.onQueryFailure(queryJobListener);
144-
batcher.onQueryFailure(new NoResponseListener(this));
145-
batcher.onUrisReady(queryJobListener);
146-
return batcher;
145+
return newQueryBatcher(new QueryBatcherImpl(query, this, getForestConfig()));
147146
}
148147

149148
@Override
150149
public QueryBatcher newQueryBatcher(Iterator<String> iterator) {
151150
if ( iterator == null ) throw new IllegalArgumentException("iterator must not be null");
152-
QueryBatcherImpl batcher = new QueryBatcherImpl(iterator, this, getForestConfig());
151+
return newQueryBatcher(new QueryBatcherImpl(iterator, this, getForestConfig()));
152+
}
153+
154+
private QueryBatcher newQueryBatcher(QueryBatcherImpl batcher) {
153155
// add a default listener to handle host failover scenarios
154156
batcher.onQueryFailure(new HostAvailabilityListener(this));
155157
QueryJobReportListener queryJobListener = new QueryJobReportListener();
@@ -166,7 +168,9 @@ private ForestConfiguration getForestConfig() {
166168

167169
@Override
168170
public ForestConfiguration readForestConfig() {
169-
forestConfig = service.readForestConfig();
171+
if (connectPolicy == DatabaseClient.ConnectionPolicy.FOREST_HOSTS) {
172+
forestConfig = service.readForestConfig();
173+
}
170174
return forestConfig;
171175
}
172176

@@ -203,6 +207,11 @@ public JobTicket getActiveJob(String jobId) {
203207
}
204208
}
205209

210+
@Override
211+
public DatabaseClient.ConnectionPolicy getConnectionPolicy() {
212+
return connectPolicy;
213+
}
214+
206215
public DataMovementServices getDataMovementServices() {
207216
return service;
208217
}

marklogic-client-api/src/main/java/com/marklogic/client/datamovement/impl/QueryBatcherImpl.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -627,7 +627,9 @@ public void run() {
627627
// this try-with-resources block will call results.close() once the block is done
628628
// here we call the /v1/internal/uris endpoint to get the text/uri-list of documents
629629
// matching this structured or string query
630-
try ( UrisHandle results = queryMgr.uris(query, handle, start, null, forest.getForestName()) ) {
630+
try ( UrisHandle results = queryMgr.uris(
631+
query, handle, start, null, (forest instanceof AnyForestConfiguration.AnyForest) ? null : forest.getForestName()
632+
) ) {
631633
// if we're doing consistentSnapshot and this is the first result set, let's capture the
632634
// serverTimestamp so we can use it for all future queries
633635
if ( consistentSnapshot == true && serverTimestamp.get() == -1 ) {
@@ -712,10 +714,8 @@ private void shutdownIfAllForestsAreDone() {
712714
}
713715
// if we made it this far, all forests are done. let's run the Job
714716
// completion listeners and shutdown.
715-
if ( !runJobCompletionListeners.isAlive() ) {
716-
synchronized(this) {
717-
if ( !runJobCompletionListeners.isAlive() ) runJobCompletionListeners.start();
718-
}
717+
synchronized(this) {
718+
if ( !runJobCompletionListeners.isAlive() ) runJobCompletionListeners.start();
719719
}
720720
threadPool.shutdown();
721721
}

marklogic-client-api/src/main/java/com/marklogic/client/impl/DatabaseClientImpl.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,14 @@ public QueryManager newQueryManager() {
136136
}
137137
@Override
138138
public DataMovementManager newDataMovementManager() {
139-
DataMovementManagerImpl moveMgr = new DataMovementManagerImpl(this);
139+
return newDataMovementManager(null);
140+
}
141+
@Override
142+
public DataMovementManager newDataMovementManager(ConnectionPolicy policy) {
143+
DataMovementManagerImpl moveMgr = new DataMovementManagerImpl(
144+
this,
145+
(policy == null) ? ConnectionPolicy.FOREST_HOSTS : policy
146+
);
140147
return moveMgr;
141148
}
142149
@Override

marklogic-client-api/src/test/java/com/marklogic/client/test/Common.java

Lines changed: 4 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -61,11 +61,14 @@ public class Common {
6161
final public static String WRITE_PRIVILIGED_PASS = "x";
6262

6363
final public static String HOST = System.getProperty("TEST_HOST", "localhost");
64+
6465
final public static int PORT = Integer.parseInt(System.getProperty("TEST_PORT", "8012"));
65-
final public static boolean PLACE_DIRECT = Boolean.parseBoolean(System.getProperty("TEST_PLACE_DIRECT", "true"));
6666
final public static boolean WITH_WAIT = Boolean.parseBoolean(System.getProperty("TEST_WAIT", "false"));
6767
final public static int PROPERTY_WAIT = Integer.parseInt(System.getProperty("TEST_PROPERTY_WAIT", WITH_WAIT ? "8200" : "0"));
6868

69+
final public static DatabaseClient.ConnectionPolicy CONNECT_POLICY =
70+
DatabaseClient.ConnectionPolicy.valueOf(System.getProperty("TEST_CONNECT_POLICY", "FOREST_HOSTS"));
71+
6972
public static DatabaseClient client;
7073
public static DatabaseClient adminClient;
7174
public static DatabaseClient serverAdminClient;
@@ -197,20 +200,6 @@ public static Document testStringToDocument(String document) {
197200
throw new RuntimeException(e);
198201
}
199202
}
200-
public static <T extends Batcher> T initBatcher(DataMovementManager moveMgr, T batcher) {
201-
if (moveMgr == null || batcher == null || PLACE_DIRECT == true) {
202-
return batcher;
203-
}
204-
return (T) batcher.withForestConfig(
205-
new FilteredForestConfiguration(moveMgr.readForestConfig()).withWhiteList(HOST)
206-
);
207-
}
208-
public static ForestConfiguration initForestConfig(ForestConfiguration forestConfig) {
209-
if (forestConfig == null || PLACE_DIRECT == true) {
210-
return forestConfig;
211-
}
212-
return new FilteredForestConfiguration(forestConfig).withWhiteList(HOST);
213-
}
214203
public static void propertyWait() {
215204
waitFor(PROPERTY_WAIT);
216205
}

marklogic-client-api/src/test/java/com/marklogic/client/test/datamovement/ApplyTransformTest.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@
4343

4444
public class ApplyTransformTest {
4545
private static DatabaseClient client = Common.connect();
46-
private static DataMovementManager moveMgr = client.newDataMovementManager();
46+
private static DataMovementManager moveMgr = client.newDataMovementManager(Common.CONNECT_POLICY);
4747
private static GenericDocumentManager docMgr = client.newDocumentManager();
4848
private static QueryManager queryMgr = client.newQueryManager();
4949
private static StructuredQueryBuilder sqb = new StructuredQueryBuilder();
@@ -85,7 +85,7 @@ public void testResultReplace() throws Exception {
8585
ApplyTransformListener listener = new ApplyTransformListener()
8686
.withTransform(transform)
8787
.withApplyResult(ApplyResult.REPLACE);
88-
QueryBatcher batcher = Common.initBatcher(moveMgr, moveMgr.newQueryBatcher(query))
88+
QueryBatcher batcher = moveMgr.newQueryBatcher(query)
8989
.onUrisReady(listener);
9090
JobTicket ticket = moveMgr.startJob( batcher );
9191
batcher.awaitCompletion();
@@ -108,7 +108,7 @@ public void testResultIgnore() throws Exception {
108108
ApplyTransformListener listener = new ApplyTransformListener()
109109
.withTransform(transform)
110110
.withApplyResult(ApplyResult.IGNORE);
111-
QueryBatcher batcher = Common.initBatcher(moveMgr, moveMgr.newQueryBatcher(query))
111+
QueryBatcher batcher = moveMgr.newQueryBatcher(query)
112112
.onUrisReady(listener);
113113
JobTicket ticket = moveMgr.startJob( batcher );
114114
batcher.awaitCompletion();
@@ -124,7 +124,7 @@ public void testManyDocs() throws Exception {
124124
DocumentMetadataHandle meta = new DocumentMetadataHandle().withCollections(collection);
125125
int numDocs = 1000;
126126
// write the documents
127-
WriteBatcher batcher1 = Common.initBatcher(moveMgr, moveMgr.newWriteBatcher())
127+
WriteBatcher batcher1 = moveMgr.newWriteBatcher()
128128
.withBatchSize(100)
129129
.onBatchFailure((batch, throwable) -> throwable.printStackTrace());
130130
JobTicket ticket1 = moveMgr.startJob( batcher1 );
@@ -145,7 +145,7 @@ public void testManyDocs() throws Exception {
145145
.withApplyResult(ApplyResult.REPLACE)
146146
.onSuccess(batch -> count2.addAndGet(batch.getItems().length))
147147
.onBatchFailure((batch, throwable) -> throwable.printStackTrace());
148-
QueryBatcher batcher = Common.initBatcher(moveMgr, moveMgr.newQueryBatcher(query2))
148+
QueryBatcher batcher = moveMgr.newQueryBatcher(query2)
149149
.onUrisReady(listener)
150150
.withConsistentSnapshot();
151151
JobTicket ticket2 = moveMgr.startJob( batcher );
@@ -157,7 +157,7 @@ public void testManyDocs() throws Exception {
157157
StructuredQueryDefinition query3 = sqb.value(sqb.jsonProperty("testProperty"), "test3a");
158158
query3.setCollections(collection);
159159
final AtomicInteger count3 = new AtomicInteger(0);
160-
QueryBatcher batcher3 = Common.initBatcher(moveMgr, moveMgr.newQueryBatcher(query3))
160+
QueryBatcher batcher3 = moveMgr.newQueryBatcher(query3)
161161
.withBatchSize(100)
162162
.onUrisReady(batch -> count3.addAndGet(batch.getItems().length))
163163
.onQueryFailure((throwable) -> throwable.printStackTrace());
@@ -183,7 +183,7 @@ public void testOnSkipped() throws Exception {
183183
ServerTransform transform = new ServerTransform(transformName1);
184184
List skippedUris = new ArrayList<>();
185185
StringBuilder failures = new StringBuilder();
186-
QueryBatcher batcher = Common.initBatcher(moveMgr, moveMgr.newQueryBatcher(uris.iterator()))
186+
QueryBatcher batcher = moveMgr.newQueryBatcher(uris.iterator())
187187
.withBatchSize(1)
188188
.onUrisReady(
189189
new ApplyTransformListener()

0 commit comments

Comments
 (0)