Skip to content

Commit 232ab63

Browse files
ehennumehennum
authored andcommitted
get forest configuration even in load balancer case #999
1 parent e5b90af commit 232ab63

File tree

7 files changed

+84
-94
lines changed

7 files changed

+84
-94
lines changed

marklogic-client-api/src/main/java/com/marklogic/client/datamovement/FilteredForestConfiguration.java

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,18 +16,13 @@
1616
package com.marklogic.client.datamovement;
1717

1818
import com.marklogic.client.datamovement.Forest.HostType;
19-
import com.marklogic.client.datamovement.impl.AnyForestConfiguration;
20-
import com.marklogic.client.datamovement.impl.ForestConfigurationImpl;
19+
import com.marklogic.client.datamovement.impl.GatewayForestConfiguration;
2120
import com.marklogic.client.datamovement.impl.ForestImpl;
2221

23-
import java.util.ArrayList;
24-
import java.util.Arrays;
2522
import java.util.HashMap;
2623
import java.util.HashSet;
27-
import java.util.List;
2824
import java.util.Map;
2925
import java.util.Set;
30-
import java.util.stream.Collectors;
3126
import java.util.stream.Stream;
3227

3328
/**
@@ -66,8 +61,8 @@ public class FilteredForestConfiguration implements ForestConfiguration {
6661
* @param forestConfig the ForestConfiguration to wrap
6762
*/
6863
public FilteredForestConfiguration(ForestConfiguration forestConfig) {
69-
if (forestConfig instanceof AnyForestConfiguration) {
70-
throw new IllegalArgumentException("cannot filter the forest hosts when using only the primary host");
64+
if (forestConfig instanceof GatewayForestConfiguration) {
65+
throw new IllegalArgumentException("cannot filter the forest hosts when connecting using a gateway");
7166
}
7267
this.wrappedForestConfig = forestConfig;
7368
}

marklogic-client-api/src/main/java/com/marklogic/client/datamovement/HostAvailabilityListener.java

Lines changed: 22 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,7 @@ public HostAvailabilityListener withMinHosts(int numHosts) {
138138
}
139139
} else {
140140
if (numHosts <= 0) throw new IllegalArgumentException("numHosts must be > 0");
141+
// TODO: use existing forest configuration instead of refreshing?
141142
int numConfigHosts = moveMgr.readForestConfig().getPreferredHosts().length;
142143
if (numHosts > numConfigHosts) throw new IllegalArgumentException
143144
("numHosts must be less than or equal to the number of hosts in the cluster");
@@ -274,23 +275,7 @@ private boolean processForestHostException(Batcher batcher, Throwable throwable,
274275
filteredForestConfig = filteredForestConfig.withRenamedHost(host, randomAvailableHost);
275276
}
276277
batcher.withForestConfig(filteredForestConfig);
277-
// cancel any previously scheduled re-sync
278-
if ( future != null ) future.cancel(false);
279-
// schedule a re-sync with the server forest config
280-
future = Executors.newScheduledThreadPool(1)
281-
.schedule( () -> {
282-
if ( batcher.isStopped() ) {
283-
logger.debug("Job \"{}\" is stopped, so cancelling re-sync with the server forest config",
284-
batcher.getJobName());
285-
} else {
286-
ForestConfiguration updatedForestConfig = moveMgr.readForestConfig();
287-
logger.info("it's been {} since host {} failed, opening communication to all server hosts [{}]",
288-
suspendTimeForHostUnavailable.toString(), host, Arrays.asList(updatedForestConfig.getPreferredHosts()));
289-
// set the forestConfig back to whatever the server says it is
290-
batcher.withForestConfig(updatedForestConfig);
291-
}
292-
}
293-
, suspendTimeForHostUnavailable.toMillis(), TimeUnit.MILLISECONDS);
278+
scheduleForestResynch(batcher, host);
294279
} else {
295280
// by black-listing this host we'd move below minHosts, so it's time to
296281
// stop this job
@@ -304,6 +289,26 @@ private boolean processForestHostException(Batcher batcher, Throwable throwable,
304289
return shouldWeRetry;
305290
}
306291

292+
private void scheduleForestResynch(Batcher batcher, String host) {
293+
// cancel any previously scheduled re-sync
294+
if ( future != null ) future.cancel(false);
295+
// schedule a re-sync with the server forest config
296+
future = Executors.newScheduledThreadPool(1)
297+
.schedule( () -> {
298+
if ( batcher.isStopped() ) {
299+
logger.debug("Job \"{}\" is stopped, so cancelling re-sync with the server forest config",
300+
batcher.getJobName());
301+
} else {
302+
ForestConfiguration updatedForestConfig = moveMgr.readForestConfig();
303+
logger.info("it's been {} since host {} failed, opening communication to all server hosts [{}]",
304+
suspendTimeForHostUnavailable.toString(), host, Arrays.asList(updatedForestConfig.getPreferredHosts()));
305+
// set the forestConfig back to whatever the server says it is
306+
batcher.withForestConfig(updatedForestConfig);
307+
}
308+
}
309+
, suspendTimeForHostUnavailable.toMillis(), TimeUnit.MILLISECONDS);
310+
}
311+
307312
protected boolean isHostUnavailableException(Throwable throwable, Set<Throwable> path) {
308313
for ( Class<?> type : hostUnavailableExceptions ) {
309314
if ( type.isInstance(throwable) ) {

marklogic-client-api/src/main/java/com/marklogic/client/datamovement/impl/AnyForestConfiguration.java

Lines changed: 0 additions & 36 deletions
This file was deleted.

marklogic-client-api/src/main/java/com/marklogic/client/datamovement/impl/DataMovementManagerImpl.java

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -47,16 +47,10 @@ public class DataMovementManagerImpl implements DataMovementManager {
4747
private static ConcurrentHashMap<String, JobTicket> activeJobs = new ConcurrentHashMap<>();
4848
private ForestConfiguration forestConfig;
4949
private DatabaseClient primaryClient;
50-
private DatabaseClient.ConnectionType connectionType;
5150
// clientMap key is the hostname_database
5251
private Map<String,DatabaseClient> clientMap = new HashMap<>();
5352

5453
public DataMovementManagerImpl(DatabaseClient client) {
55-
connectionType = client.getConnectionType();
56-
if (connectionType == DatabaseClient.ConnectionType.GATEWAY) {
57-
forestConfig = new AnyForestConfiguration(client);
58-
}
59-
6054
setPrimaryClient(client);
6155

6256
clientMap.put(primaryClient.getHost(), primaryClient);
@@ -163,9 +157,7 @@ private ForestConfiguration getForestConfig() {
163157

164158
@Override
165159
public ForestConfiguration readForestConfig() {
166-
if (getConnectionType() == DatabaseClient.ConnectionType.DIRECT) {
167-
forestConfig = service.readForestConfig();
168-
}
160+
forestConfig = service.readForestConfig();
169161
return forestConfig;
170162
}
171163

@@ -204,7 +196,7 @@ public JobTicket getActiveJob(String jobId) {
204196

205197
@Override
206198
public DatabaseClient.ConnectionType getConnectionType() {
207-
return connectionType;
199+
return primaryClient.getConnectionType();
208200
}
209201

210202
public DataMovementServices getDataMovementServices() {

marklogic-client-api/src/main/java/com/marklogic/client/datamovement/impl/DataMovementServices.java

Lines changed: 27 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -21,15 +21,10 @@
2121

2222
import com.fasterxml.jackson.databind.JsonNode;
2323
import com.marklogic.client.DatabaseClient;
24+
import com.marklogic.client.datamovement.*;
2425
import com.marklogic.client.impl.DatabaseClientImpl;
2526
import com.marklogic.client.io.JacksonHandle;
26-
import com.marklogic.client.datamovement.Batcher;
27-
import com.marklogic.client.datamovement.DataMovementException;
28-
import com.marklogic.client.datamovement.JobTicket;
2927
import com.marklogic.client.datamovement.JobTicket.JobType;
30-
import com.marklogic.client.datamovement.QueryBatcher;
31-
import com.marklogic.client.datamovement.WriteBatcher;
32-
import com.marklogic.client.datamovement.JobReport;
3328
import com.marklogic.client.datamovement.impl.ForestConfigurationImpl;
3429
import java.util.List;
3530

@@ -46,6 +41,8 @@ public DataMovementServices setClient(DatabaseClient client) {
4641
}
4742

4843
public ForestConfigurationImpl readForestConfig() {
44+
DatabaseClient.ConnectionType connectionType = client.getConnectionType();
45+
4946
List<ForestImpl> forests = new ArrayList<>();
5047
JsonNode results = ((DatabaseClientImpl) client).getServices()
5148
.getResource(null, "internal/forestinfo", null, null, new JacksonHandle())
@@ -54,24 +51,34 @@ public ForestConfigurationImpl readForestConfig() {
5451
String id = forestNode.get("id").asText();
5552
String name = forestNode.get("name").asText();
5653
String database = forestNode.get("database").asText();
57-
String host = forestNode.get("host").asText();
58-
String openReplicaHost = null;
59-
if ( forestNode.get("openReplicaHost") != null ) openReplicaHost = forestNode.get("openReplicaHost").asText();
60-
String requestHost = null;
61-
if ( forestNode.get("requestHost") != null ) requestHost = forestNode.get("requestHost").asText();
62-
String alternateHost = null;
63-
if ( forestNode.get("alternateHost") != null ) alternateHost = forestNode.get("alternateHost").asText();
64-
// Since we added the forestinfo end point to populate both alternateHost and requestHost
65-
// in case we have a requestHost so that we don't break the existing API code, we will make the
66-
// alternateHost as null if both alternateHost and requestHost is set.
67-
if ( requestHost != null && alternateHost != null )
68-
alternateHost = null;
6954
boolean isUpdateable = "all".equals(forestNode.get("updatesAllowed").asText());
7055
boolean isDeleteOnly = false; // TODO: get this for real after we start using a REST endpoint
71-
forests.add(new ForestImpl(host, openReplicaHost, requestHost, alternateHost, database, name, id, isUpdateable, isDeleteOnly));
56+
if (connectionType == DatabaseClient.ConnectionType.GATEWAY) {
57+
forests.add(
58+
new GatewayForestConfiguration.GatewayForest(client.getHost(), database, name, id, isUpdateable, isDeleteOnly)
59+
);
60+
} else {
61+
String host = forestNode.get("host").asText();
62+
String openReplicaHost = null;
63+
if ( forestNode.get("openReplicaHost") != null ) openReplicaHost = forestNode.get("openReplicaHost").asText();
64+
String requestHost = null;
65+
if ( forestNode.get("requestHost") != null ) requestHost = forestNode.get("requestHost").asText();
66+
String alternateHost = null;
67+
if ( forestNode.get("alternateHost") != null ) alternateHost = forestNode.get("alternateHost").asText();
68+
// Since we added the forestinfo end point to populate both alternateHost and requestHost
69+
// in case we have a requestHost so that we don't break the existing API code, we will make the
70+
// alternateHost as null if both alternateHost and requestHost is set.
71+
if ( requestHost != null && alternateHost != null )
72+
alternateHost = null;
73+
forests.add(
74+
new ForestImpl(host, openReplicaHost, requestHost, alternateHost, database, name, id, isUpdateable, isDeleteOnly)
75+
);
76+
}
7277
}
7378

74-
return new ForestConfigurationImpl(forests.toArray(new ForestImpl[forests.size()]));
79+
Forest[] forestdefs = forests.toArray(new ForestImpl[forests.size()]);
80+
return (connectionType == DatabaseClient.ConnectionType.GATEWAY) ?
81+
new GatewayForestConfiguration(client.getHost(), forestdefs) : new ForestConfigurationImpl(forestdefs);
7582
}
7683

7784
public JobTicket startJob(WriteBatcher batcher, ConcurrentHashMap<String, JobTicket> activeJobs) {
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
package com.marklogic.client.datamovement.impl;
2+
3+
import com.marklogic.client.DatabaseClient;
4+
import com.marklogic.client.datamovement.Forest;
5+
import com.marklogic.client.datamovement.ForestConfiguration;
6+
7+
import java.util.stream.Stream;
8+
9+
public class GatewayForestConfiguration extends ForestConfigurationImpl {
10+
private String[] preferredHosts;
11+
12+
GatewayForestConfiguration(String primaryHost, Forest[] forests) {
13+
super(forests);
14+
if (primaryHost == null) throw new IllegalArgumentException("primaryHost argument must not be null");
15+
16+
this.preferredHosts = new String[]{primaryHost};
17+
}
18+
19+
@Override
20+
public String[] getPreferredHosts() {
21+
return preferredHosts;
22+
}
23+
24+
static public class GatewayForest extends ForestImpl {
25+
public GatewayForest(String primaryHost, String databaseName, String forestName, String forestId, boolean isUpdateable, boolean isDeleteOnly) {
26+
super(primaryHost, primaryHost, primaryHost, primaryHost, databaseName, forestName, forestId, isUpdateable, isDeleteOnly);
27+
}
28+
}
29+
}

marklogic-client-api/src/main/java/com/marklogic/client/datamovement/impl/QueryBatcherImpl.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -627,9 +627,7 @@ public void run() {
627627
// this try-with-resources block will call results.close() once the block is done
628628
// here we call the /v1/internal/uris endpoint to get the text/uri-list of documents
629629
// matching this structured or string query
630-
try ( UrisHandle results = queryMgr.uris(
631-
query, handle, start, null, (forest instanceof AnyForestConfiguration.AnyForest) ? null : forest.getForestName()
632-
) ) {
630+
try ( UrisHandle results = queryMgr.uris(query, handle, start, null, forest.getForestName()) ) {
633631
// if we're doing consistentSnapshot and this is the first result set, let's capture the
634632
// serverTimestamp so we can use it for all future queries
635633
if ( consistentSnapshot == true && serverTimestamp.get() == -1 ) {

0 commit comments

Comments
 (0)