Skip to content

Commit 200e174

Browse files
ehennumehennum
authored andcommitted
specify connection type as direct or gateway on client #999
1 parent b104c9a commit 200e174

24 files changed

+211
-169
lines changed

marklogic-client-api/src/main/java/com/marklogic/client/DatabaseClient.java

Lines changed: 16 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -34,17 +34,19 @@
3434
import com.marklogic.client.pojo.PojoRepository;
3535
import com.marklogic.client.semantics.GraphManager;
3636
import com.marklogic.client.semantics.SPARQLQueryManager;
37-
import com.marklogic.client.DatabaseClientFactory.Authentication;
38-
import com.marklogic.client.DatabaseClientFactory.SSLHostnameVerifier;
3937
import com.marklogic.client.DatabaseClientFactory.SecurityContext;
4038

41-
import javax.net.ssl.SSLContext;
42-
4339
/**
4440
* A Database Client instantiates document and query managers and other objects
4541
* with shared access to a database.
4642
*/
4743
public interface DatabaseClient {
44+
/**
45+
* Identifies whether the client connects directly to MarkLogic (the default) or
46+
* by means of a gateway such as a load balancer.
47+
*/
48+
enum ConnectionType {DIRECT, GATEWAY}
49+
4850
/**
4951
* Starts a transaction. You can pass the transaction to the read(), write(), or delete() methods
5052
* of a document manager or the search() method of a query manager to perform operations within a
@@ -100,22 +102,18 @@ public interface DatabaseClient {
100102
XMLDocumentManager newXMLDocumentManager();
101103

102104
/**
103-
* Creates a manager for long-running asynchronous write or query jobs
104-
* with the default ConnectionPolicy.FOREST_HOSTS policy of creating
105-
* a new connection for each host that has forests for the database.
105+
* Creates a manager for long-running asynchronous write or query jobs.
106+
* When the primary database client has the default ConnectionType.DIRECT
107+
* connection type, the DataMovementManager creates a new connection
108+
* for each host that has forests for the database. When the primary
109+
* database client has the ConnectionType.GATEWAY connection type
110+
* (for instance, when connecting to a load balancer), the DataMovementManager
111+
* uses the primary database client for all communication.
106112
* Don't forget to call dataMovementManager.release() when you're done with it.
107113
* @return a manager supporting long-running asynchronous write or query jobs
108114
*/
109115
DataMovementManager newDataMovementManager();
110116

111-
/**
112-
* Creates a manager for long-running asynchronous write or query jobs. Don't forget
113-
* to call dataMovementManager.release() when you're done with it.
114-
* @param policy how the DataMovementManager should create connections
115-
* @return a manager supporting long-running asynchronous write or query jobs
116-
*/
117-
DataMovementManager newDataMovementManager(ConnectionPolicy policy);
118-
119117
/**
120118
* Creates a manager to query for database documents.
121119
* @return a manager supporting search operations and lookup of values and tuples in indexes (also known as lexicons)
@@ -223,10 +221,10 @@ public interface DatabaseClient {
223221
ServerEvaluationCall newServerEval();
224222

225223
/**
226-
* Specifies whether to connect only using the host from the initial database connection
227-
* or to create a new connection for each host that has forests for the database.
224+
* How the client connects to MarkLogic.
225+
* @return the connection type
228226
*/
229-
public enum ConnectionPolicy{PRIMARY_HOST, FOREST_HOSTS}
227+
ConnectionType getConnectionType();
230228

231229
String getHost();
232230

marklogic-client-api/src/main/java/com/marklogic/client/DatabaseClientFactory.java

Lines changed: 72 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -687,7 +687,25 @@ static public DatabaseClient newClient(String host, int port, String database) {
687687
* @return a new client for making database requests
688688
*/
689689
static public DatabaseClient newClient(String host, int port, SecurityContext securityContext) {
690-
return newClient(host, port, null, securityContext);
690+
return newClient(host, port, null, securityContext, null);
691+
}
692+
693+
/**
694+
* Creates a client to access the database by means of a REST server.
695+
*
696+
* @param host the host with the REST server
697+
* @param port the port for the REST server
698+
* @param securityContext the security context created depending upon the
699+
* authentication type - BasicAuthContext, DigestAuthContext or KerberosAuthContext
700+
* and communication channel type (SSL)
701+
* @param connectionType whether the client connects directly to the MarkLogic host
702+
* or using a gateway such as a load balancer
703+
* @return a new client for making database requests
704+
*/
705+
static public DatabaseClient newClient(String host, int port, SecurityContext securityContext,
706+
DatabaseClient.ConnectionType connectionType)
707+
{
708+
return newClient(host, port, null, securityContext, connectionType);
691709
}
692710

693711
/**
@@ -703,6 +721,27 @@ static public DatabaseClient newClient(String host, int port, SecurityContext se
703721
* @return a new client for making database requests
704722
*/
705723
static public DatabaseClient newClient(String host, int port, String database, SecurityContext securityContext) {
724+
return newClient(host, port, null, securityContext, null);
725+
}
726+
727+
/**
728+
* Creates a client to access the database by means of a REST server.
729+
*
730+
* @param host the host with the REST server
731+
* @param port the port for the REST server
732+
* @param database the database to access (default: configured database for
733+
* the REST server)
734+
* @param securityContext the security context created depending upon the
735+
* authentication type - BasicAuthContext, DigestAuthContext or KerberosAuthContext
736+
* and communication channel type (SSL)
737+
* @param connectionType whether the client connects directly to the MarkLogic host
738+
* or using a gateway such as a load balancer
739+
* @return a new client for making database requests
740+
*/
741+
static public DatabaseClient newClient(String host, int port, String database,
742+
SecurityContext securityContext,
743+
DatabaseClient.ConnectionType connectionType)
744+
{
706745
String user = null;
707746
String password = null;
708747
Authentication type = null;
@@ -783,7 +822,9 @@ static public DatabaseClient newClient(String host, int port, String database, S
783822
}
784823
}
785824

786-
DatabaseClientImpl client = new DatabaseClientImpl(services, host, port, database, securityContext);
825+
DatabaseClientImpl client = new DatabaseClientImpl(
826+
services, host, port, database, securityContext, connectionType
827+
);
787828
client.setHandleRegistry(getHandleRegistry().copy());
788829
return client;
789830
}
@@ -816,7 +857,7 @@ static private SecurityContext makeSecurityContext(String user, String password,
816857
*/
817858
@Deprecated
818859
static public DatabaseClient newClient(String host, int port, String user, String password, Authentication type) {
819-
return newClient(host, port, null, makeSecurityContext(user, password, type, null, null));
860+
return newClient(host, port, null, makeSecurityContext(user, password, type, null, null), null);
820861
}
821862
/**
822863
* Creates a client to access the database by means of a REST server.
@@ -832,7 +873,7 @@ static public DatabaseClient newClient(String host, int port, String user, Strin
832873
*/
833874
@Deprecated
834875
static public DatabaseClient newClient(String host, int port, String database, String user, String password, Authentication type) {
835-
return newClient(host, port, database, makeSecurityContext(user, password, type, null, null));
876+
return newClient(host, port, database, makeSecurityContext(user, password, type, null, null), null);
836877
}
837878
/**
838879
* Creates a client to access the database by means of a REST server.
@@ -848,7 +889,7 @@ static public DatabaseClient newClient(String host, int port, String database, S
848889
*/
849890
@Deprecated
850891
static public DatabaseClient newClient(String host, int port, String user, String password, Authentication type, SSLContext context) {
851-
return newClient(host, port, null, makeSecurityContext(user, password, type, context, SSLHostnameVerifier.COMMON));
892+
return newClient(host, port, null, makeSecurityContext(user, password, type, context, SSLHostnameVerifier.COMMON), null);
852893
}
853894
/**
854895
* Creates a client to access the database by means of a REST server.
@@ -865,7 +906,7 @@ static public DatabaseClient newClient(String host, int port, String user, Strin
865906
*/
866907
@Deprecated
867908
static public DatabaseClient newClient(String host, int port, String database, String user, String password, Authentication type, SSLContext context) {
868-
return newClient(host, port, database, makeSecurityContext(user, password, type, context, SSLHostnameVerifier.COMMON));
909+
return newClient(host, port, database, makeSecurityContext(user, password, type, context, SSLHostnameVerifier.COMMON), null);
869910
}
870911
/**
871912
* Creates a client to access the database by means of a REST server.
@@ -882,7 +923,7 @@ static public DatabaseClient newClient(String host, int port, String database, S
882923
*/
883924
@Deprecated
884925
static public DatabaseClient newClient(String host, int port, String user, String password, Authentication type, SSLContext context, SSLHostnameVerifier verifier) {
885-
return newClient(host, port, null, makeSecurityContext(user, password, type, context, verifier));
926+
return newClient(host, port, null, makeSecurityContext(user, password, type, context, verifier), null);
886927
}
887928
/**
888929
* Creates a client to access the database by means of a REST server.
@@ -900,7 +941,7 @@ static public DatabaseClient newClient(String host, int port, String user, Strin
900941
*/
901942
@Deprecated
902943
static public DatabaseClient newClient(String host, int port, String database, String user, String password, Authentication type, SSLContext context, SSLHostnameVerifier verifier) {
903-
return newClient(host, port, database, makeSecurityContext(user, password, type, context, verifier));
944+
return newClient(host, port, database, makeSecurityContext(user, password, type, context, verifier), null);
904945
}
905946

906947
/**
@@ -978,6 +1019,7 @@ static public class Bean implements Serializable {
9781019
private Authentication authentication;
9791020
private String externalName;
9801021
private SecurityContext securityContext;
1022+
private DatabaseClient.ConnectionType connectionType;
9811023
private HandleFactoryRegistry handleRegistry =
9821024
HandleFactoryRegistryImpl.newDefault();
9831025

@@ -1189,6 +1231,24 @@ public SecurityContext getSecurityContext() {
11891231
public void setSecurityContext(SecurityContext securityContext) {
11901232
this.securityContext = securityContext;
11911233
}
1234+
/**
1235+
* Identifies whether the client connects directly with a MarkLogic host
1236+
* or by means of a gateway such as a load balancer.
1237+
* @return the connection type
1238+
*/
1239+
public DatabaseClient.ConnectionType getConnectionType() {
1240+
return connectionType;
1241+
}
1242+
/**
1243+
* Specify whether the client connects directly with a MarkLogic host
1244+
* or by means of a gateway such as a load balancer.
1245+
* @param connectionType the connection type
1246+
*/
1247+
public void setConnectionType(DatabaseClient.ConnectionType connectionType) {
1248+
this.connectionType = connectionType;
1249+
}
1250+
1251+
11921252
/**
11931253
* Returns the registry for associating
11941254
* IO representation classes with handle factories.
@@ -1222,8 +1282,10 @@ public void registerDefaultHandles() {
12221282
* @return a new client for making database requests
12231283
*/
12241284
public DatabaseClient newClient() {
1225-
DatabaseClientImpl client = (DatabaseClientImpl) DatabaseClientFactory.newClient(host, port, database,
1226-
makeSecurityContext(user, password, authentication, context, verifier));
1285+
DatabaseClientImpl client = (DatabaseClientImpl) DatabaseClientFactory.newClient(
1286+
host, port, database,
1287+
makeSecurityContext(user, password, authentication, context, verifier),
1288+
connectionType);
12271289
client.setHandleRegistry(getHandleRegistry().copy());
12281290
return client;
12291291
}

marklogic-client-api/src/main/java/com/marklogic/client/datamovement/DataMovementManager.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,10 @@
1616
package com.marklogic.client.datamovement;
1717

1818
import com.marklogic.client.DatabaseClient;
19-
import com.marklogic.client.query.QueryDefinition;
2019
import com.marklogic.client.query.StringQueryDefinition;
2120
import com.marklogic.client.query.StructuredQueryDefinition;
2221
import com.marklogic.client.query.RawCombinedQueryDefinition;
2322
import com.marklogic.client.query.RawStructuredQueryDefinition;
24-
import com.marklogic.client.datamovement.impl.DataMovementManagerImpl;
2523

2624
import java.util.Iterator;
2725

@@ -186,9 +184,10 @@ public interface DataMovementManager {
186184
public ForestConfiguration readForestConfig();
187185

188186
/**
189-
* Identify the DataMovementManager uses only the primary database connection
190-
* or manages a connection for each host that has forests for the database.
191-
* @return the connection policy
187+
* Identify whether the DataMovementManager connects directly to each MarkLogic host
188+
* with a forest for the database or whether the DataMovementManager uses a gateway
189+
* such as a load balancer to communicate with the MarkLogic hosts.
190+
* @return the connection type
192191
*/
193-
public DatabaseClient.ConnectionPolicy getConnectionPolicy();
192+
public DatabaseClient.ConnectionType getConnectionType();
194193
}

marklogic-client-api/src/main/java/com/marklogic/client/datamovement/HostAvailabilityListener.java

Lines changed: 16 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -70,11 +70,6 @@ public class HostAvailabilityListener implements QueryFailureListener, WriteFail
7070
private ScheduledFuture<?> future;
7171
Set<QueryBatchListener> retryListenersSet = new HashSet<>();
7272
List<Class<?>> hostUnavailableExceptions = new ArrayList<>();
73-
{
74-
hostUnavailableExceptions.add(SocketException.class);
75-
hostUnavailableExceptions.add(SSLException.class);
76-
hostUnavailableExceptions.add(UnknownHostException.class);
77-
}
7873

7974
// Retry listener for Query batches, for which the list of URIs have been
8075
// retrieved from the server but the batch failed while applying the listener
@@ -109,6 +104,11 @@ public void processFailure(QueryBatch batch, Throwable throwable) {
109104
public HostAvailabilityListener(DataMovementManager moveMgr) {
110105
if (moveMgr == null) throw new IllegalArgumentException("moveMgr must not be null");
111106
this.moveMgr = moveMgr;
107+
if (moveMgr.getConnectionType() == DatabaseClient.ConnectionType.DIRECT) {
108+
hostUnavailableExceptions.add(SocketException.class);
109+
hostUnavailableExceptions.add(SSLException.class);
110+
hostUnavailableExceptions.add(UnknownHostException.class);
111+
}
112112
}
113113

114114
/** If a host becomes unavailable (SocketException, SSLException,
@@ -132,7 +132,7 @@ public HostAvailabilityListener withSuspendTimeForHostUnavailable(Duration
132132
* @return this instance (for method chaining)
133133
*/
134134
public HostAvailabilityListener withMinHosts(int numHosts) {
135-
if (moveMgr.getConnectionPolicy() == DatabaseClient.ConnectionPolicy.PRIMARY_HOST) {
135+
if (moveMgr.getConnectionType() == DatabaseClient.ConnectionType.GATEWAY) {
136136
if (numHosts != 1) {
137137
throw new IllegalArgumentException("numHosts must be 1 when using only the primary host for the connection");
138138
}
@@ -224,22 +224,20 @@ public void processFailure(QueryBatchException queryBatch) {
224224
}
225225

226226
private synchronized boolean processException(Batcher batcher, Throwable throwable, String host) {
227-
// we only do something if this throwable is on our list of exceptions
228-
// which we consider marking a host as unavilable
229-
boolean isHostUnavailableException = isHostUnavailableException(throwable, new HashSet<>());
230-
return (moveMgr.getConnectionPolicy() == DatabaseClient.ConnectionPolicy.PRIMARY_HOST) ?
231-
processPrimaryHostException(batcher, throwable, host, isHostUnavailableException) :
232-
processForestHostException(batcher, throwable, host, isHostUnavailableException);
227+
return (moveMgr.getConnectionType() == DatabaseClient.ConnectionType.GATEWAY) ?
228+
processGatewayException(batcher, throwable, host) :
229+
processForestHostException(batcher, throwable, host);
233230
}
234231

235-
private boolean processPrimaryHostException(Batcher batcher, Throwable throwable, String host, boolean isHostUnavailableException) {
236-
// TODO: currently, cancelling the job because the load balancer is down would require
237-
// a custom listener; determine whether the approach could be simplified with an array
238-
// of exceptions different from the exceptions that indicate the MarkLogic host is unavailable
239-
return isHostUnavailableException;
232+
private boolean processGatewayException(Batcher batcher, Throwable throwable, String host) {
233+
// if the nested retry failed, assume the MarkLogic cluster is unavailable
234+
return false;
240235
}
241236

242-
private boolean processForestHostException(Batcher batcher, Throwable throwable, String host, boolean isHostUnavailableException) {
237+
private boolean processForestHostException(Batcher batcher, Throwable throwable, String host) {
238+
// we only do something if this throwable is on our list of exceptions
239+
// which we consider marking a host as unavilable
240+
boolean isHostUnavailableException = isHostUnavailableException(throwable, new HashSet<>());
243241
boolean shouldWeRetry = isHostUnavailableException;
244242
if ( isHostUnavailableException == true ) {
245243
ForestConfiguration existingForestConfig = batcher.getForestConfig();

marklogic-client-api/src/main/java/com/marklogic/client/datamovement/impl/DataMovementManagerImpl.java

Lines changed: 11 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -30,22 +30,15 @@
3030
import com.marklogic.client.datamovement.JobTicket;
3131
import com.marklogic.client.datamovement.NoResponseListener;
3232
import com.marklogic.client.datamovement.QueryBatcher;
33-
import com.marklogic.client.datamovement.impl.QueryJobReportListener;
3433
import com.marklogic.client.datamovement.WriteBatcher;
3534
import com.marklogic.client.datamovement.JobReport;
36-
import com.marklogic.client.datamovement.impl.WriteJobReportListener;
37-
import com.marklogic.client.datamovement.impl.QueryBatcherImpl;
38-
import com.marklogic.client.datamovement.impl.DataMovementServices;
39-
import com.marklogic.client.datamovement.impl.WriteBatcherImpl;
4035

4136
import org.slf4j.Logger;
4237
import org.slf4j.LoggerFactory;
4338

4439
import java.util.HashMap;
4540
import java.util.Iterator;
46-
import java.util.List;
4741
import java.util.Map;
48-
import java.util.Random;
4942
import java.util.concurrent.ConcurrentHashMap;
5043

5144
public class DataMovementManagerImpl implements DataMovementManager {
@@ -54,17 +47,19 @@ public class DataMovementManagerImpl implements DataMovementManager {
5447
private static ConcurrentHashMap<String, JobTicket> activeJobs = new ConcurrentHashMap<>();
5548
private ForestConfiguration forestConfig;
5649
private DatabaseClient primaryClient;
57-
private DatabaseClient.ConnectionPolicy connectPolicy;
50+
private DatabaseClient.ConnectionType connectionType;
5851
// clientMap key is the hostname_database
5952
private Map<String,DatabaseClient> clientMap = new HashMap<>();
6053

61-
public DataMovementManagerImpl(DatabaseClient client, DatabaseClient.ConnectionPolicy connectPolicy) {
62-
setPrimaryClient(client);
63-
clientMap.put(primaryClient.getHost(), primaryClient);
64-
this.connectPolicy = connectPolicy;
65-
if (connectPolicy == DatabaseClient.ConnectionPolicy.PRIMARY_HOST) {
54+
public DataMovementManagerImpl(DatabaseClient client) {
55+
connectionType = client.getConnectionType();
56+
if (connectionType == DatabaseClient.ConnectionType.GATEWAY) {
6657
forestConfig = new AnyForestConfiguration(client);
6758
}
59+
60+
setPrimaryClient(client);
61+
62+
clientMap.put(primaryClient.getHost(), primaryClient);
6863
}
6964

7065
@Override
@@ -168,7 +163,7 @@ private ForestConfiguration getForestConfig() {
168163

169164
@Override
170165
public ForestConfiguration readForestConfig() {
171-
if (connectPolicy == DatabaseClient.ConnectionPolicy.FOREST_HOSTS) {
166+
if (getConnectionType() == DatabaseClient.ConnectionType.DIRECT) {
172167
forestConfig = service.readForestConfig();
173168
}
174169
return forestConfig;
@@ -208,8 +203,8 @@ public JobTicket getActiveJob(String jobId) {
208203
}
209204

210205
@Override
211-
public DatabaseClient.ConnectionPolicy getConnectionPolicy() {
212-
return connectPolicy;
206+
public DatabaseClient.ConnectionType getConnectionType() {
207+
return connectionType;
213208
}
214209

215210
public DataMovementServices getDataMovementServices() {

0 commit comments

Comments
 (0)