Skip to content

Commit 19c5e69

Browse files
author
ehennum
committed
tuning interface for bulk IO endpoints #1165
1 parent db65afe commit 19c5e69

File tree

16 files changed

+243
-150
lines changed

16 files changed

+243
-150
lines changed

marklogic-client-api/src/main/java/com/marklogic/client/dataservices/ExecEndpoint.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
package com.marklogic.client.dataservices;
1717

1818
import com.marklogic.client.DatabaseClient;
19+
import com.marklogic.client.SessionState;
1920
import com.marklogic.client.dataservices.impl.ExecEndpointImpl;
2021
import com.marklogic.client.io.marker.JSONWriteHandle;
2122

@@ -38,10 +39,13 @@ static ExecEndpoint on(DatabaseClient client, JSONWriteHandle apiDecl) {
3839
}
3940

4041
/**
41-
* Makes one call to the endpoint for the instance
42-
* @param workUnit the definition of a unit of work which should be null if not accepted by the endpoint
42+
* Makes one call to the endpoint for the instance.
43+
* @param endpointState the current mutable state of the endpoint (which must be null if not accepted by the endpoint)
44+
* @param session the identifier for the server cache of the endpoint (which must be null if not accepted by the endpoint)
45+
* @param workUnit the definition of a unit of work (which must be null if not accepted by the endpoint)
46+
* @return the endpoint state for the next call, if returned by the endpoint, or null
4347
*/
44-
void call(InputStream workUnit);
48+
InputStream call(InputStream endpointState, SessionState session, InputStream workUnit);
4549

4650
/**
4751
* Constructs an instance of a bulk caller, which completes

marklogic-client-api/src/main/java/com/marklogic/client/dataservices/IOEndpoint.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
*/
1616
package com.marklogic.client.dataservices;
1717

18+
import com.marklogic.client.SessionState;
1819
import com.marklogic.client.io.marker.BufferableHandle;
1920

2021
import java.io.InputStream;
@@ -36,6 +37,12 @@ public interface IOEndpoint {
3637
* @return whether the endpoint takes a state
3738
*/
3839
boolean allowsEndpointState();
40+
/**
41+
* Identifies whether the endpoint accepts a session identifier that
42+
* the endpoint can use to access a session cache on the server.
43+
* @return whether the endpoint takes a session
44+
*/
45+
boolean allowsSession();
3946
/**
4047
* Identifies whether the endpoint accepts a data structure that defines
4148
* the unit of work to be done by the endpoint.
@@ -49,6 +56,13 @@ public interface IOEndpoint {
4956
*/
5057
boolean allowsInput();
5158

59+
/**
60+
* Generates an identifier for an endpoint to use when accessing a session cache
61+
* on the server. The identifier can be reused for multiple calls.
62+
* @return a new identifier for a session cache on the server
63+
*/
64+
SessionState newSessionState();
65+
5266
/**
5367
* Base interface providing the methods common to all bulk endpoint callers.
5468
*/

marklogic-client-api/src/main/java/com/marklogic/client/dataservices/InputEndpoint.java

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,9 @@
1616
package com.marklogic.client.dataservices;
1717

1818
import java.io.InputStream;
19-
import java.util.stream.Stream;
2019

2120
import com.marklogic.client.DatabaseClient;
21+
import com.marklogic.client.SessionState;
2222
import com.marklogic.client.dataservices.impl.InputEndpointImpl;
2323
import com.marklogic.client.io.marker.JSONWriteHandle;
2424

@@ -35,12 +35,16 @@ public interface InputEndpoint extends IOEndpoint {
3535
static InputEndpoint on(DatabaseClient client, JSONWriteHandle apiDecl) {
3636
return new InputEndpointImpl(client, apiDecl);
3737
}
38+
3839
/**
39-
* Makes one call to the endpoint for the instance
40-
* @param workUnit the definition of a unit of work
41-
* @param input the stream given to the endpoint as input.
42-
*/
43-
void call(InputStream workUnit, Stream<InputStream> input);
40+
* Makes one call to the endpoint for the instance
41+
* @param endpointState the current mutable state of the endpoint (which must be null if not accepted by the endpoint)
42+
* @param session the identifier for the server cache of the endpoint (which must be null if not accepted by the endpoint)
43+
* @param workUnit the definition of a unit of work (which must be null if not accepted by the endpoint)
44+
* @param input the request data sent to the endpoint
45+
* @return the endpoint state for the next call, if returned by the endpoint, or null
46+
*/
47+
InputStream call(InputStream endpointState, SessionState session, InputStream workUnit, InputStream[] input);
4448
/**
4549
* Constructs an instance of a bulk caller, which completes
4650
* a unit of work by repeated calls to the endpoint.
@@ -54,9 +58,16 @@ static InputEndpoint on(DatabaseClient client, JSONWriteHandle apiDecl) {
5458
*/
5559
interface BulkInputCaller extends IOEndpoint.BulkIOEndpointCaller {
5660
/**
57-
* Accepts the input for the input endpoint.
58-
* @param input the stream given to the endpoint as input.
61+
* Accepts an input item for the endpoint. Items are queued
62+
* and submitted to the endpoint in batches.
63+
* @param input one input item
5964
*/
6065
void accept(InputStream input);
66+
/**
67+
* Accepts multiple input items for the endpoint. Items are queued
68+
* and submitted to the endpoint in batches.
69+
* @param input multiple input items.
70+
*/
71+
void acceptAll(InputStream[] input);
6172
}
6273
}

marklogic-client-api/src/main/java/com/marklogic/client/dataservices/InputOutputEndpoint.java

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,29 +16,31 @@
1616
package com.marklogic.client.dataservices;
1717

1818
import com.marklogic.client.DatabaseClient;
19+
import com.marklogic.client.SessionState;
1920
import com.marklogic.client.dataservices.impl.InputOutputEndpointImpl;
2021
import com.marklogic.client.io.marker.JSONWriteHandle;
2122

2223
import java.io.InputStream;
2324
import java.util.function.Consumer;
24-
import java.util.stream.Stream;
2525

2626
/**
2727
* Provides an interface for calling an endpoint that takes input data structures and
2828
* returns output data structures.
2929
*/
30-
public interface InputOutputEndpoint {
30+
public interface InputOutputEndpoint extends IOEndpoint {
3131
static InputOutputEndpoint on(DatabaseClient client, JSONWriteHandle apiDecl) {
3232
return new InputOutputEndpointImpl(client, apiDecl);
3333
}
3434

3535
/**
3636
* Makes one call to the endpoint for the instance
37-
* @param workUnit the definition of a unit of work
38-
* @param input the stream given to the endpoint as input.
39-
* @return the response from the endpoint
37+
* @param endpointState the current mutable state of the endpoint (which must be null if not accepted by the endpoint)
38+
* @param session the identifier for the server cache of the endpoint (which must be null if not accepted by the endpoint)
39+
* @param workUnit the definition of a unit of work (which must be null if not accepted by the endpoint)
40+
* @param input the request data sent to the endpoint
41+
* @return the endpoint state if produced by the endpoint followed by the response data from the endpoint
4042
*/
41-
Stream<InputStream> call(InputStream workUnit, Stream<InputStream> input);
43+
InputStream[] call(InputStream endpointState, SessionState session, InputStream workUnit, InputStream[] input);
4244

4345
/**
4446
* Constructs an instance of a bulk caller, which completes
@@ -58,9 +60,16 @@ interface BulkInputOutputCaller extends IOEndpoint.BulkIOEndpointCaller {
5860
*/
5961
void setOutputListener(Consumer<InputStream> listener);
6062
/**
61-
* Accepts the input for the input endpoint.
62-
* @param input the stream given to the endpoint as input.
63+
* Accepts an input item for the endpoint. Items are queued
64+
* and submitted to the endpoint in batches.
65+
* @param input one input item
6366
*/
6467
void accept(InputStream input);
68+
/**
69+
* Accepts multiple input items for the endpoint. Items are queued
70+
* and submitted to the endpoint in batches.
71+
* @param input multiple input items.
72+
*/
73+
void acceptAll(InputStream[] input);
6574
}
6675
}

marklogic-client-api/src/main/java/com/marklogic/client/dataservices/OutputEndpoint.java

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,12 @@
1616
package com.marklogic.client.dataservices;
1717

1818
import com.marklogic.client.DatabaseClient;
19+
import com.marklogic.client.SessionState;
1920
import com.marklogic.client.dataservices.impl.OutputEndpointImpl;
2021
import com.marklogic.client.io.marker.JSONWriteHandle;
2122

2223
import java.io.InputStream;
2324
import java.util.function.Consumer;
24-
import java.util.stream.Stream;
2525

2626
/**
2727
* Provides an interface for calling an endpoint that returns output data structures.
@@ -39,17 +39,19 @@ static OutputEndpoint on(DatabaseClient client, JSONWriteHandle apiDecl) {
3939

4040
/**
4141
* Makes one call to the endpoint for the instance
42-
* @param workUnit the definition of a unit of work
43-
* @return the response from the endpoint
42+
* @param endpointState the current mutable state of the endpoint (which must be null if not accepted by the endpoint)
43+
* @param session the identifier for the server cache of the endpoint (which must be null if not accepted by the endpoint)
44+
* @param workUnit the definition of a unit of work (which must be null if not accepted by the endpoint)
45+
* @return the endpoint state if produced by the endpoint followed by the response data from the endpoint
4446
*/
45-
Stream<InputStream> call(InputStream workUnit);
47+
InputStream[] call(InputStream endpointState, SessionState session, InputStream workUnit);
4648

4749
/**
4850
* Constructs an instance of a bulk caller, which completes
4951
* a unit of work by repeated calls to the endpoint.
5052
* @return the bulk caller for the output endpoint
5153
*/
52-
OutputEndpoint.BulkOutputCaller bulkCaller();
54+
BulkOutputCaller bulkCaller();
5355

5456
/**
5557
* Provides an interface for completing a unit of work
@@ -65,6 +67,6 @@ interface BulkOutputCaller extends IOEndpoint.BulkIOEndpointCaller {
6567
* Provides synchronous access to output.
6668
* @return the response from the endpoint.
6769
*/
68-
Stream<InputStream> next();
70+
InputStream[] next();
6971
}
7072
}

marklogic-client-api/src/main/java/com/marklogic/client/dataservices/impl/ExecEndpointImpl.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -42,10 +42,9 @@ private ExecCallerImpl getCaller() {
4242
}
4343

4444
@Override
45-
public void call(InputStream workUnit) {
46-
if (workUnit != null && !allowsWorkUnit())
47-
throw new IllegalArgumentException("endpoint does not accept work unit");
48-
getCaller().call(getClient(), null, null, workUnit);
45+
public InputStream call(InputStream endpointState, SessionState session, InputStream workUnit) {
46+
checkAllowedArgs(endpointState, session, workUnit);
47+
return getCaller().call(getClient(), endpointState, session, workUnit);
4948
}
5049

5150
@Override

marklogic-client-api/src/main/java/com/marklogic/client/dataservices/impl/IOEndpointImpl.java

Lines changed: 33 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -113,11 +113,27 @@ public boolean allowsWorkUnit() {
113113
public boolean allowsInput() {
114114
return (getCaller().getInputParamdef() != null);
115115
}
116-
117-
boolean allowsSession() {
116+
@Override
117+
public boolean allowsSession() {
118118
return (getCaller().getSessionParamdef() != null);
119119
}
120120

121+
@Override
122+
public SessionState newSessionState() {
123+
if (!allowsEndpointState())
124+
throw new IllegalStateException("endpoint does not support session state");
125+
return getCaller().newSessionState();
126+
}
127+
128+
public void checkAllowedArgs(InputStream endpointState, SessionState session, InputStream workUnit) {
129+
if (endpointState != null && !allowsEndpointState())
130+
throw new IllegalArgumentException("endpoint does not accept endpoint state");
131+
if (session != null && !allowsSession())
132+
throw new IllegalArgumentException("endpoint does not accept session");
133+
if (workUnit != null && !allowsWorkUnit())
134+
throw new IllegalArgumentException("endpoint does not accept work unit");
135+
}
136+
121137
static abstract class BulkIOEndpointCallerImpl implements IOEndpoint.BulkIOEndpointCaller {
122138
enum WorkPhase {
123139
INITIALIZING, RUNNING, INTERRUPTING, INTERRUPTED, COMPLETED;
@@ -220,11 +236,26 @@ boolean allowsInput() {
220236
}
221237

222238
boolean queueInput(InputStream input, BlockingQueue<InputStream> queue, int batchSize) {
239+
if (input == null) return false;
223240
try {
224241
queue.put(input);
225242
} catch (InterruptedException e) {
226243
throw new IllegalStateException("InputStream was not added to the queue." + e.getMessage());
227244
}
245+
return checkQueue(queue, batchSize);
246+
}
247+
boolean queueAllInput(InputStream[] input, BlockingQueue<InputStream> queue, int batchSize) {
248+
if (input == null || input.length == 0) return false;
249+
try {
250+
for (InputStream item: input) {
251+
queue.put(item);
252+
}
253+
} catch (InterruptedException e) {
254+
throw new IllegalStateException("InputStream was not added to the queue." + e.getMessage());
255+
}
256+
return checkQueue(queue, batchSize);
257+
}
258+
boolean checkQueue(BlockingQueue<InputStream> queue, int batchSize) {
228259
if ((queue.size() % batchSize) > 0)
229260
return false;
230261

marklogic-client-api/src/main/java/com/marklogic/client/dataservices/impl/InputEndpointImpl.java

Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -16,25 +16,19 @@
1616
package com.marklogic.client.dataservices.impl;
1717

1818
import java.io.InputStream;
19-
import java.util.ArrayList;
20-
import java.util.List;
2119
import java.util.concurrent.LinkedBlockingQueue;
22-
import java.util.stream.Stream;
2320

2421
import org.slf4j.Logger;
2522
import org.slf4j.LoggerFactory;
2623

27-
import com.fasterxml.jackson.databind.JsonNode;
2824
import com.marklogic.client.DatabaseClient;
29-
import com.marklogic.client.MarkLogicInternalException;
3025
import com.marklogic.client.SessionState;
3126
import com.marklogic.client.dataservices.InputEndpoint;
32-
import com.marklogic.client.dataservices.impl.IOEndpointImpl;
33-
import com.marklogic.client.dataservices.impl.IOEndpointImpl.BulkIOEndpointCallerImpl.WorkPhase;
3427
import com.marklogic.client.io.marker.JSONWriteHandle;
3528

3629
public class InputEndpointImpl extends IOEndpointImpl implements InputEndpoint {
3730
private static Logger logger = LoggerFactory.getLogger(InputEndpointImpl.class);
31+
3832
private InputCallerImpl caller;
3933
private int batchSize;
4034

@@ -55,11 +49,9 @@ private int getBatchSize() {
5549
}
5650

5751
@Override
58-
public void call(InputStream workUnit, Stream<InputStream> input) {
59-
if (workUnit != null && !allowsWorkUnit())
60-
throw new IllegalArgumentException("Input endpoint does not accept work unit");
61-
62-
getCaller().streamCall(getClient(), null, null, workUnit, input);
52+
public InputStream call(InputStream endpointState, SessionState session, InputStream workUnit, InputStream[] input) {
53+
checkAllowedArgs(endpointState, session, workUnit);
54+
return getCaller().arrayCall(getClient(), endpointState, session, workUnit, input);
6355
}
6456

6557
@Override
@@ -97,6 +89,12 @@ public void accept(InputStream input) {
9789
if (hasBatch)
9890
processInput();
9991
}
92+
@Override
93+
public void acceptAll(InputStream[] input) {
94+
boolean hasBatch = queueAllInput(input, getQueue(), getBatchSize());
95+
if (hasBatch)
96+
processInput();
97+
}
10098

10199
@Override
102100
public void awaitCompletion() {

marklogic-client-api/src/main/java/com/marklogic/client/dataservices/impl/InputOutputEndpointImpl.java

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
package com.marklogic.client.dataservices.impl;
1717

1818
import com.marklogic.client.DatabaseClient;
19+
import com.marklogic.client.SessionState;
1920
import com.marklogic.client.dataservices.InputOutputEndpoint;
2021
import com.marklogic.client.io.marker.JSONWriteHandle;
2122
import org.slf4j.Logger;
@@ -24,7 +25,6 @@
2425
import java.io.InputStream;
2526
import java.util.concurrent.LinkedBlockingQueue;
2627
import java.util.function.Consumer;
27-
import java.util.stream.Stream;
2828

2929
public class InputOutputEndpointImpl extends IOEndpointImpl implements InputOutputEndpoint {
3030
private static Logger logger = LoggerFactory.getLogger(InputOutputEndpointImpl.class);
@@ -50,12 +50,13 @@ private int getBatchSize() {
5050
}
5151

5252
@Override
53-
public Stream<InputStream> call(InputStream workUnit, Stream<InputStream> input) {
54-
return getCaller().streamCall(getClient(), null, null, workUnit, input);
53+
public InputStream[] call(InputStream endpointState, SessionState session, InputStream workUnit, InputStream[] input) {
54+
checkAllowedArgs(endpointState, session, workUnit);
55+
return getCaller().arrayCall(getClient(), endpointState, session, workUnit, input);
5556
}
5657

5758
@Override
58-
public BulkInputOutputCaller bulkCaller() {
59+
public InputOutputEndpoint.BulkInputOutputCaller bulkCaller() {
5960
return new BulkInputOutputCallerImpl(this, getBatchSize());
6061
}
6162

@@ -95,12 +96,21 @@ public void setOutputListener(Consumer<InputStream> listener) {
9596
@Override
9697
public void accept(InputStream input) {
9798
if (getOutputListener() == null)
98-
throw new IllegalStateException("Output consumer is null");
99+
throw new IllegalStateException("Must configure output consumer before providing input");
99100

100101
boolean hasBatch = queueInput(input, getQueue(), getBatchSize());
101102
if (hasBatch)
102103
processInput();
103104
}
105+
@Override
106+
public void acceptAll(InputStream[] input) {
107+
if (getOutputListener() == null)
108+
throw new IllegalStateException("Must configure output consumer before providing input");
109+
110+
boolean hasBatch = queueAllInput(input, getQueue(), getBatchSize());
111+
if (hasBatch)
112+
processInput();
113+
}
104114
private void processInput() {
105115
logger.trace("input endpoint running endpoint={} count={} state={}", getEndpointPath(), getCallCount(),
106116
getEndpointState());

0 commit comments

Comments
 (0)