Skip to content

Commit 6a6eb89

Browse files
ehennumehennum
authored andcommitted
outer retry on changed forests in load balancer case #999
1 parent 232ab63 commit 6a6eb89

12 files changed

+142
-93
lines changed
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
/*
2+
* Copyright 2012-2018 MarkLogic Corporation
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.marklogic.client;
17+
18+
import com.marklogic.client.impl.FailedRequest;
19+
20+
/**
21+
* A FailedRetryException is used to capture and report when retry
22+
* of the request timed out or failed in some other way.
23+
*/
24+
@SuppressWarnings("serial")
25+
public class FailedRetryException extends FailedRequestException {
26+
27+
public FailedRetryException(String message) {
28+
super(message);
29+
}
30+
31+
public FailedRetryException(String localMessage, Throwable cause) {
32+
super(localMessage, cause);
33+
}
34+
35+
public FailedRetryException(String localMessage, FailedRequest failedRequest) {
36+
super(localMessage, failedRequest);
37+
}
38+
39+
}

marklogic-client-api/src/main/java/com/marklogic/client/datamovement/FilteredForestConfiguration.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
package com.marklogic.client.datamovement;
1717

1818
import com.marklogic.client.datamovement.Forest.HostType;
19-
import com.marklogic.client.datamovement.impl.GatewayForestConfiguration;
2019
import com.marklogic.client.datamovement.impl.ForestImpl;
2120

2221
import java.util.HashMap;
@@ -61,9 +60,6 @@ public class FilteredForestConfiguration implements ForestConfiguration {
6160
* @param forestConfig the ForestConfiguration to wrap
6261
*/
6362
public FilteredForestConfiguration(ForestConfiguration forestConfig) {
64-
if (forestConfig instanceof GatewayForestConfiguration) {
65-
throw new IllegalArgumentException("cannot filter the forest hosts when connecting using a gateway");
66-
}
6763
this.wrappedForestConfig = forestConfig;
6864
}
6965

marklogic-client-api/src/main/java/com/marklogic/client/datamovement/HostAvailabilityListener.java

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
package com.marklogic.client.datamovement;
1717

1818
import com.marklogic.client.DatabaseClient;
19+
import com.marklogic.client.FailedRetryException;
1920
import org.slf4j.Logger;
2021
import org.slf4j.LoggerFactory;
2122

@@ -97,7 +98,7 @@ public void processFailure(QueryBatch batch, Throwable throwable) {
9798
}
9899

99100
/**
100-
* Manages refreshing the connection hosts and retrying events after a host
101+
* Manages refreshing the forests and hosts and retrying events after a host
101102
* becomes unavailable.
102103
* @param moveMgr the DataMovementManager (used to call readForestConfig to reset after black-listing an unavailable host)
103104
*/
@@ -231,8 +232,26 @@ private synchronized boolean processException(Batcher batcher, Throwable throwab
231232
}
232233

233234
private boolean processGatewayException(Batcher batcher, Throwable throwable, String host) {
234-
// if the nested retry failed, assume the MarkLogic cluster is unavailable
235-
return false;
235+
boolean isRetryException = (throwable instanceof FailedRetryException);
236+
boolean shouldWeRetry = false;
237+
if ( isRetryException == true ) {
238+
ForestConfiguration existingForestConfig = batcher.getForestConfig();
239+
Forest[] existingForests = existingForestConfig.listForests();
240+
Set<String> existingNames =
241+
Arrays.stream(existingForests).map(Forest::getForestName).collect(Collectors.toSet());
242+
ForestConfiguration updatedForestConfig = moveMgr.readForestConfig();
243+
Forest[] updatedForests = updatedForestConfig.listForests();
244+
boolean changedForests =
245+
Arrays.stream(updatedForests).anyMatch(forest -> existingNames.contains(forest.getForestName()));
246+
if (changedForests) {
247+
batcher.withForestConfig(updatedForestConfig);
248+
shouldWeRetry = true;
249+
} else {
250+
logger.error("retry exception without forest failover for " + batcher.getJobName() + "\"", throwable);
251+
moveMgr.stopJob(batcher);
252+
}
253+
}
254+
return shouldWeRetry;
236255
}
237256

238257
private boolean processForestHostException(Batcher batcher, Throwable throwable, String host) {

marklogic-client-api/src/main/java/com/marklogic/client/datamovement/impl/BatcherImpl.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
import com.marklogic.client.DatabaseClient;
1919
import com.marklogic.client.datamovement.Batcher;
20+
import com.marklogic.client.datamovement.DataMovementManager;
2021
import com.marklogic.client.datamovement.ForestConfiguration;
2122

2223
public abstract class BatcherImpl implements Batcher {
@@ -26,6 +27,15 @@ public abstract class BatcherImpl implements Batcher {
2627
private int threadCount = 1;
2728
private ForestConfiguration forestConfig;
2829
private DatabaseClient client;
30+
private DataMovementManagerImpl moveMgr;
31+
32+
protected BatcherImpl(DataMovementManager moveMgr){
33+
if (moveMgr == null)
34+
throw new IllegalArgumentException("moveMgr must not be null");
35+
if (!(moveMgr instanceof DataMovementManagerImpl))
36+
throw new IllegalArgumentException("moveMgr must be DataMovementManagerImpl");
37+
this.moveMgr = (DataMovementManagerImpl) moveMgr;
38+
}
2939

3040
@Override
3141
public Batcher withJobName(String jobName) {
@@ -85,6 +95,8 @@ public ForestConfiguration getForestConfig() {
8595
@Override
8696
public Batcher withForestConfig(ForestConfiguration forestConfig) {
8797
if ( forestConfig == null ) throw new IllegalArgumentException("forestConfig must not be null");
98+
if (moveMgr.getConnectionType() == DatabaseClient.ConnectionType.GATEWAY && !(forestConfig instanceof ForestConfigurationImpl))
99+
throw new IllegalArgumentException("cannot change internal forestConfig when using a gateway");
88100
this.forestConfig = forestConfig;
89101
return this;
90102
}
@@ -94,4 +106,8 @@ public Batcher withForestConfig(ForestConfiguration forestConfig) {
94106

95107
@Override
96108
public abstract boolean isStarted();
109+
110+
public DataMovementManagerImpl getMoveMgr() {
111+
return moveMgr;
112+
}
97113
}

marklogic-client-api/src/main/java/com/marklogic/client/datamovement/impl/DataMovementManagerImpl.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,9 @@ public ForestConfiguration readForestConfig() {
163163

164164
public DatabaseClient getForestClient(Forest forest) {
165165
if ( forest == null ) throw new IllegalArgumentException("forest must not be null");
166+
if (getConnectionType() == DatabaseClient.ConnectionType.GATEWAY) {
167+
return getPrimaryClient();
168+
}
166169
String hostName = forest.getPreferredHost();
167170
String key = hostName;
168171
DatabaseClient client = clientMap.get(key);

marklogic-client-api/src/main/java/com/marklogic/client/datamovement/impl/DataMovementServices.java

Lines changed: 21 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,15 @@
2121

2222
import com.fasterxml.jackson.databind.JsonNode;
2323
import com.marklogic.client.DatabaseClient;
24-
import com.marklogic.client.datamovement.*;
2524
import com.marklogic.client.impl.DatabaseClientImpl;
2625
import com.marklogic.client.io.JacksonHandle;
26+
import com.marklogic.client.datamovement.Batcher;
27+
import com.marklogic.client.datamovement.DataMovementException;
28+
import com.marklogic.client.datamovement.JobTicket;
2729
import com.marklogic.client.datamovement.JobTicket.JobType;
30+
import com.marklogic.client.datamovement.QueryBatcher;
31+
import com.marklogic.client.datamovement.WriteBatcher;
32+
import com.marklogic.client.datamovement.JobReport;
2833
import com.marklogic.client.datamovement.impl.ForestConfigurationImpl;
2934
import java.util.List;
3035

@@ -41,8 +46,6 @@ public DataMovementServices setClient(DatabaseClient client) {
4146
}
4247

4348
public ForestConfigurationImpl readForestConfig() {
44-
DatabaseClient.ConnectionType connectionType = client.getConnectionType();
45-
4649
List<ForestImpl> forests = new ArrayList<>();
4750
JsonNode results = ((DatabaseClientImpl) client).getServices()
4851
.getResource(null, "internal/forestinfo", null, null, new JacksonHandle())
@@ -51,34 +54,26 @@ public ForestConfigurationImpl readForestConfig() {
5154
String id = forestNode.get("id").asText();
5255
String name = forestNode.get("name").asText();
5356
String database = forestNode.get("database").asText();
57+
String host = forestNode.get("host").asText();
58+
String openReplicaHost = null;
59+
if ( forestNode.get("openReplicaHost") != null ) openReplicaHost = forestNode.get("openReplicaHost").asText();
60+
String requestHost = null;
61+
if ( forestNode.get("requestHost") != null ) requestHost = forestNode.get("requestHost").asText();
62+
String alternateHost = null;
63+
if ( forestNode.get("alternateHost") != null ) alternateHost = forestNode.get("alternateHost").asText();
64+
// Since we added the forestinfo end point to populate both alternateHost and requestHost
65+
// in case we have a requestHost so that we don't break the existing API code, we will make the
66+
// alternateHost as null if both alternateHost and requestHost is set.
67+
if ( requestHost != null && alternateHost != null )
68+
alternateHost = null;
5469
boolean isUpdateable = "all".equals(forestNode.get("updatesAllowed").asText());
5570
boolean isDeleteOnly = false; // TODO: get this for real after we start using a REST endpoint
56-
if (connectionType == DatabaseClient.ConnectionType.GATEWAY) {
57-
forests.add(
58-
new GatewayForestConfiguration.GatewayForest(client.getHost(), database, name, id, isUpdateable, isDeleteOnly)
59-
);
60-
} else {
61-
String host = forestNode.get("host").asText();
62-
String openReplicaHost = null;
63-
if ( forestNode.get("openReplicaHost") != null ) openReplicaHost = forestNode.get("openReplicaHost").asText();
64-
String requestHost = null;
65-
if ( forestNode.get("requestHost") != null ) requestHost = forestNode.get("requestHost").asText();
66-
String alternateHost = null;
67-
if ( forestNode.get("alternateHost") != null ) alternateHost = forestNode.get("alternateHost").asText();
68-
// Since we added the forestinfo end point to populate both alternateHost and requestHost
69-
// in case we have a requestHost so that we don't break the existing API code, we will make the
70-
// alternateHost as null if both alternateHost and requestHost is set.
71-
if ( requestHost != null && alternateHost != null )
72-
alternateHost = null;
73-
forests.add(
71+
forests.add(
7472
new ForestImpl(host, openReplicaHost, requestHost, alternateHost, database, name, id, isUpdateable, isDeleteOnly)
75-
);
76-
}
73+
);
7774
}
7875

79-
Forest[] forestdefs = forests.toArray(new ForestImpl[forests.size()]);
80-
return (connectionType == DatabaseClient.ConnectionType.GATEWAY) ?
81-
new GatewayForestConfiguration(client.getHost(), forestdefs) : new ForestConfigurationImpl(forestdefs);
76+
return new ForestConfigurationImpl(forests.toArray(new ForestImpl[forests.size()]));
8277
}
8378

8479
public JobTicket startJob(WriteBatcher batcher, ConcurrentHashMap<String, JobTicket> activeJobs) {

marklogic-client-api/src/main/java/com/marklogic/client/datamovement/impl/GatewayForestConfiguration.java

Lines changed: 0 additions & 29 deletions
This file was deleted.

marklogic-client-api/src/main/java/com/marklogic/client/datamovement/impl/QueryBatcherImpl.java

Lines changed: 13 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,6 @@ public class QueryBatcherImpl extends BatcherImpl implements QueryBatcher {
6161
private static Logger logger = LoggerFactory.getLogger(QueryBatcherImpl.class);
6262
private QueryDefinition query;
6363
private Iterator<String> iterator;
64-
private DataMovementManager moveMgr;
6564
private boolean threadCountSet = false;
6665
private List<QueryBatchListener> urisReadyListeners = new ArrayList<>();
6766
private List<QueryFailureListener> failureListeners = new ArrayList<>();
@@ -83,16 +82,14 @@ public class QueryBatcherImpl extends BatcherImpl implements QueryBatcher {
8382
private Thread runJobCompletionListeners;
8483

8584
public QueryBatcherImpl(QueryDefinition query, DataMovementManager moveMgr, ForestConfiguration forestConfig) {
86-
super();
87-
this.moveMgr = moveMgr;
85+
super(moveMgr);
8886
this.query = query;
8987
withForestConfig(forestConfig);
9088
withBatchSize(1000);
9189
}
9290

9391
public QueryBatcherImpl(Iterator<String> iterator, DataMovementManager moveMgr, ForestConfiguration forestConfig) {
94-
super();
95-
this.moveMgr = moveMgr;
92+
super(moveMgr);
9693
this.iterator = iterator;
9794
withForestConfig(forestConfig);
9895
withBatchSize(1000);
@@ -161,7 +158,7 @@ private void retry(QueryEvent queryEvent, boolean callFailListeners) {
161158
long start = queryEvent.getForestResultsSoFar() + 1;
162159
logger.trace("retryForest: {}, retryHost: {}, start: {}",
163160
retryForest.getForestName(), retryForest.getPreferredHost(), start);
164-
QueryTask runnable = new QueryTask(moveMgr, this, retryForest, query,
161+
QueryTask runnable = new QueryTask(getMoveMgr(), this, retryForest, query,
165162
queryEvent.getForestBatchNumber(), start, queryEvent.getJobBatchNumber(), callFailListeners);
166163
runnable.run();
167164
}
@@ -178,7 +175,7 @@ public void retryListener(QueryBatch batch, QueryBatchListener queryBatchListene
178175
Forest[] forests = batch.getBatcher().getForestConfig().listForests();
179176
for(Forest forest : forests) {
180177
if(forest.equals(batch.getForest()))
181-
client = ((DataMovementManagerImpl) moveMgr).getForestClient(forest);
178+
client = getMoveMgr().getForestClient(forest);
182179
}
183180
QueryBatchImpl retryBatch = new QueryBatchImpl()
184181
.withClient( client )
@@ -435,7 +432,7 @@ public synchronized QueryBatcher withForestConfig(ForestConfiguration forestConf
435432
List<DatabaseClient> newClientList = new ArrayList<>();
436433
for ( String host : hosts.keySet() ) {
437434
Forest forest = hosts.get(host);
438-
DatabaseClient client = ((DataMovementManagerImpl) moveMgr).getForestClient(forest);
435+
DatabaseClient client = getMoveMgr().getForestClient(forest);
439436
newClientList.add(client);
440437
}
441438
clientList.set(newClientList);
@@ -462,7 +459,7 @@ private synchronized void calucluateDeltas(Set<Forest> oldForests, Forest[] fore
462459
blackListedForests.remove(forest);
463460
}
464461
if ( blackListedForests.size() > 0 ) {
465-
DataMovementManagerImpl moveMgrImpl = (DataMovementManagerImpl) moveMgr;
462+
DataMovementManagerImpl moveMgrImpl = getMoveMgr();
466463
String primaryHost = moveMgrImpl.getPrimaryClient().getHost();
467464
if ( getHostNames(blackListedForests).contains(primaryHost) ) {
468465
int randomPos = Math.abs(primaryHost.hashCode()) % clientList.get().size();
@@ -504,7 +501,7 @@ private synchronized void cleanupExistingTasks(Set<Forest> addedForests, Set<For
504501
for ( Forest forest : addedForests ) {
505502
// we don't need to worry about consistentSnapshotFirstQueryHasRun because that's already done
506503
// or we wouldn't be here because we wouldn't have a synchronized lock on this
507-
threadPool.execute(new QueryTask(moveMgr, this, forest, query, 1, 1));
504+
threadPool.execute(new QueryTask(getMoveMgr(), this, forest, query, 1, 1));
508505
}
509506
if ( restartedForests.size() > 0 ) {
510507
logger.warn("re-adding jobs related to forests [{}] to the queue", getForestNames(restartedForests));
@@ -548,7 +545,7 @@ private List<String> getHostNames(Collection<Forest> forests) {
548545
private synchronized void startQuerying() {
549546
boolean consistentSnapshotFirstQueryHasRun = false;
550547
for ( Forest forest : getForestConfig().listForests() ) {
551-
QueryTask runnable = new QueryTask(moveMgr, this, forest, query, 1, 1);
548+
QueryTask runnable = new QueryTask(getMoveMgr(), this, forest, query, 1, 1);
552549
if ( consistentSnapshot == true && consistentSnapshotFirstQueryHasRun == false ) {
553550
// let's run this first time in-line so we'll have the serverTimestamp set
554551
// before we launch all the parallel threads
@@ -712,8 +709,10 @@ private void shutdownIfAllForestsAreDone() {
712709
}
713710
// if we made it this far, all forests are done. let's run the Job
714711
// completion listeners and shutdown.
715-
synchronized(this) {
716-
if ( !runJobCompletionListeners.isAlive() ) runJobCompletionListeners.start();
712+
if ( !runJobCompletionListeners.isAlive() ) {
713+
synchronized(this) {
714+
if ( !runJobCompletionListeners.isAlive() ) runJobCompletionListeners.start();
715+
}
717716
}
718717
threadPool.shutdown();
719718
}
@@ -950,7 +949,7 @@ protected void terminated() {
950949

951950
@Override
952951
public DatabaseClient getPrimaryClient() {
953-
return ((DataMovementManagerImpl) moveMgr).getPrimaryClient();
952+
return getMoveMgr().getPrimaryClient();
954953
}
955954

956955
@Override

0 commit comments

Comments
 (0)