Skip to content

Commit d4b6529

Browse files
author
ehennum
committed
take input as bufferable handles in error listener #1250
1 parent 82a2df3 commit d4b6529

File tree

12 files changed

+105
-77
lines changed

12 files changed

+105
-77
lines changed

marklogic-client-api/src/main/java/com/marklogic/client/dataservices/InputCaller.java

Lines changed: 28 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import com.marklogic.client.DatabaseClient;
1919
import com.marklogic.client.dataservices.impl.InputEndpointImpl;
2020
import com.marklogic.client.io.marker.BufferableContentHandle;
21+
import com.marklogic.client.io.marker.BufferableHandle;
2122
import com.marklogic.client.io.marker.JSONWriteHandle;
2223

2324
/**
@@ -95,11 +96,35 @@ interface BulkInputCaller<I> extends BulkIOEndpointCaller {
9596
*/
9697
void acceptAll(I[] input);
9798

98-
void setErrorListener(ErrorListener<I> errorListener);
99+
/**
100+
* Provides a callback that specifies the disposition of a failed call.
101+
* @param errorListener the lambda or other implementation of the error listener
102+
*/
103+
void setErrorListener(ErrorListener errorListener);
99104

100-
interface ErrorListener<I> {
105+
/**
106+
* A function implementation that specifies the disposition of a failed call.
107+
*/
108+
interface ErrorListener {
109+
/**
110+
* The signature for the lambda or other implementation of the callback that specifies
111+
* the disposition of a failed call.
112+
*
113+
* The input is typed with the BufferableHandle marker interface. The actual class
114+
* of the handle is
115+
* <ul>
116+
* <li>the same as the input handle provided when constructing the InputOutputCaller
117+
* if the content representation is resendable for retry (as with String)</li>
118+
* <li>a BytesHandle if the content must be buffered for retry (as with InputStream)</li>
119+
* </ul>
120+
* @param retryCount the number of times the call with this input has been retried
121+
* @param throwable the error received
122+
* @param callContext the context for the call
123+
* @param input the input for the call
124+
* @return whether to retry the call, skip the call, or stop the job
125+
*/
101126
ErrorDisposition processError(
102-
int retryCount, Throwable throwable, CallContext callContext, I[] input
127+
int retryCount, Throwable throwable, CallContext callContext, BufferableHandle[] input
103128
);
104129
}
105130
}

marklogic-client-api/src/main/java/com/marklogic/client/dataservices/InputOutputCaller.java

Lines changed: 28 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import com.marklogic.client.DatabaseClient;
1919
import com.marklogic.client.dataservices.impl.InputOutputEndpointImpl;
2020
import com.marklogic.client.io.marker.BufferableContentHandle;
21+
import com.marklogic.client.io.marker.BufferableHandle;
2122
import com.marklogic.client.io.marker.JSONWriteHandle;
2223

2324
import java.util.function.Consumer;
@@ -111,11 +112,35 @@ interface BulkInputOutputCaller<I,O> extends BulkIOEndpointCaller {
111112
*/
112113
void acceptAll(I[] input);
113114

114-
void setErrorListener(ErrorListener<I> errorListener);
115+
/**
116+
* Provides a callback specifies the disposition of a failed call.
117+
* @param errorListener the lambda or other implementation of the error listener
118+
*/
119+
void setErrorListener(ErrorListener errorListener);
115120

116-
interface ErrorListener<I> {
121+
/**
122+
* A function implementation that specifies the disposition of a failed call.
123+
*/
124+
interface ErrorListener {
125+
/**
126+
* The signature for the lambda or other implementation of the callback that specifies
127+
* the disposition of a failed call.
128+
*
129+
* The input is typed with the BufferableHandle marker interface. The actual class
130+
* of the handle is
131+
* <ul>
132+
* <li>the same as the input handle provided when constructing the InputOutputCaller
133+
* if the content representation is resendable for retry (as with String)</li>
134+
* <li>a BytesHandle if the content must be buffered for retry (as with InputStream)</li>
135+
* </ul>
136+
* @param retryCount the number of times the call with this input has been retried
137+
* @param throwable the error received
138+
* @param callContext the context for the call
139+
* @param input the input for the call
140+
* @return whether to retry the call, skip the call, or stop the job
141+
*/
117142
ErrorDisposition processError(
118-
int retryCount, Throwable throwable, CallContext callContext, I[] input
143+
int retryCount, Throwable throwable, CallContext callContext, BufferableHandle[] input
119144
);
120145
}
121146
}

marklogic-client-api/src/main/java/com/marklogic/client/dataservices/impl/IOCallerImpl.java

Lines changed: 3 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import com.marklogic.client.io.BaseHandle;
2525
import com.marklogic.client.io.BytesHandle;
2626
import com.marklogic.client.io.marker.BufferableContentHandle;
27+
import com.marklogic.client.io.marker.BufferableHandle;
2728
import com.marklogic.client.io.marker.JSONWriteHandle;
2829

2930
import java.util.stream.Stream;
@@ -188,35 +189,13 @@ BaseProxy.DBFunctionRequest makeRequest(DatabaseClient db, CallContextImpl<I,O>
188189
return makeRequest(db, callCtxt, (RESTServices.CallField) null);
189190
}
190191
BaseProxy.DBFunctionRequest makeRequest(
191-
DatabaseClient db, CallContextImpl<I,O> callCtxt, Stream<I> input
192+
DatabaseClient db, CallContextImpl<I,O> callCtxt, BufferableHandle[] input
192193
) {
193194
RESTServices.CallField inputField = null;
194195

195196
ParamdefImpl paramdef = getInputParamdef();
196197
if (paramdef != null) {
197-
inputField = BaseProxy.documentParam(
198-
"input",
199-
paramdef.isNullable(),
200-
NodeConverter.streamWithFormat(input.map(inputHandle::resendableHandleFor), paramdef.getFormat())
201-
);
202-
} else if (input != null) {
203-
throw new IllegalArgumentException("input parameter not supported by endpoint: "+getEndpointPath());
204-
}
205-
206-
return makeRequest(db, callCtxt, inputField);
207-
}
208-
BaseProxy.DBFunctionRequest makeRequest(
209-
DatabaseClient db, CallContextImpl<I,O> callCtxt, I[] input
210-
) {
211-
RESTServices.CallField inputField = null;
212-
213-
ParamdefImpl paramdef = getInputParamdef();
214-
if (paramdef != null) {
215-
inputField = BaseProxy.documentParam(
216-
"input",
217-
paramdef.isNullable(),
218-
NodeConverter.arrayWithFormat(inputHandle.resendableHandleFor(input), paramdef.getFormat())
219-
);
198+
inputField = BaseProxy.documentParam("input", paramdef.isNullable(), input);
220199
} else if (input != null && input.length > 0) {
221200
throw new IllegalArgumentException("input parameter not supported by endpoint: "+getEndpointPath());
222201
}
@@ -309,9 +288,6 @@ O responseSingle(BaseProxy.DBFunctionRequest request) {
309288

310289
return request.responseSingle(getReturndef().isNullable(), getReturndef().getFormat()).asContent(outputHandle);
311290
}
312-
Stream<O> responseMultipleAsStream(BaseProxy.DBFunctionRequest request, CallContextImpl<I,O> callCtxt) {
313-
return responseMultiple(request).asStreamOfContent(callCtxt.isLegacyContext() ? null : callCtxt.getEndpointState(), outputHandle);
314-
}
315291
O[] responseMultipleAsArray(BaseProxy.DBFunctionRequest request, CallContextImpl<I,O> callCtxt) {
316292
return responseMultiple(request).asArrayOfContent(callCtxt.isLegacyContext() ? null : callCtxt.getEndpointState(), outputHandle);
317293
}

marklogic-client-api/src/main/java/com/marklogic/client/dataservices/impl/InputCallerImpl.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import com.marklogic.client.DatabaseClient;
2121
import com.marklogic.client.io.marker.BufferableContentHandle;
22+
import com.marklogic.client.io.marker.BufferableHandle;
2223
import com.marklogic.client.io.marker.JSONWriteHandle;
2324

2425
final public class InputCallerImpl<I,O> extends IOCallerImpl<I,O> {
@@ -41,10 +42,7 @@ public InputCallerImpl(JSONWriteHandle apiDeclaration, BufferableContentHandle<I
4142
}
4243
}
4344

44-
public void streamCall(DatabaseClient db, CallContextImpl<I,O> callCtxt, Stream<I> input) {
45-
responseWithState(makeRequest(db, callCtxt, input), callCtxt);
46-
}
47-
public void arrayCall(DatabaseClient db, CallContextImpl<I,O> callCtxt, I[] input) {
45+
public void arrayCall(DatabaseClient db, CallContextImpl<I,O> callCtxt, BufferableHandle[] input) {
4846
responseWithState(makeRequest(db, callCtxt, input), callCtxt);
4947
}
5048
}

marklogic-client-api/src/main/java/com/marklogic/client/dataservices/impl/InputEndpointImpl.java

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
import com.marklogic.client.dataservices.InputCaller;
2323
import com.marklogic.client.io.marker.BufferableContentHandle;
24+
import com.marklogic.client.io.marker.BufferableHandle;
2425
import org.slf4j.Logger;
2526
import org.slf4j.LoggerFactory;
2627

@@ -56,7 +57,8 @@ public void call(I[] input) {
5657
}
5758
@Override
5859
public void call(CallContext callContext, I[] input) {
59-
getCaller().arrayCall(getClient(), checkAllowedArgs(callContext), input);
60+
InputCallerImpl<I,O> callerImpl = getCaller();
61+
callerImpl.arrayCall(getClient(), checkAllowedArgs(callContext), callerImpl.getInputHandle().resendableHandleFor(input));
6062
}
6163

6264
@Deprecated
@@ -103,7 +105,7 @@ public static class BulkInputCallerImpl<I,O> extends IOEndpointImpl.BulkIOEndpoi
103105
private final InputEndpointImpl<I,O> endpoint;
104106
private final int batchSize;
105107
private final LinkedBlockingQueue<I> inputQueue;
106-
private ErrorListener<I> errorListener;
108+
private ErrorListener errorListener;
107109

108110
public BulkInputCallerImpl(InputEndpointImpl<I,O> endpoint) {
109111
this(endpoint, endpoint.getBatchSize(), endpoint.checkAllowedArgs(endpoint.newCallContext()));
@@ -134,10 +136,6 @@ private LinkedBlockingQueue<I> getInputQueue() {
134136
return inputQueue;
135137
}
136138

137-
private ErrorListener<I> getErrorListener() {
138-
return this.errorListener;
139-
}
140-
141139
@Override
142140
public void accept(I input) {
143141
boolean hasBatch = queueInput(input, getInputQueue(), getBatchSize());
@@ -151,8 +149,12 @@ public void acceptAll(I[] input) {
151149
processInput();
152150
}
153151

152+
private ErrorListener getErrorListener() {
153+
return this.errorListener;
154+
}
155+
154156
@Override
155-
public void setErrorListener(ErrorListener<I> errorListener) {
157+
public void setErrorListener(ErrorListener errorListener) {
156158
this.errorListener = errorListener;
157159
}
158160

@@ -190,12 +192,15 @@ else if(getCallContextQueue() != null){
190192
private void processInput(CallContextImpl<I,O> callContext, I[] inputBatch) {
191193
logger.trace("input endpoint running endpoint={} count={} state={}", (callContext).getEndpoint().getEndpointPath(), getCallCount(),
192194
callContext.getEndpointState());
195+
InputCallerImpl<I,O> callerImpl = getEndpoint().getCaller();
196+
193197
ErrorDisposition error = ErrorDisposition.RETRY;
194198

199+
BufferableHandle[] inputHandles = callerImpl.getInputHandle().resendableHandleFor(inputBatch);
195200
for (int retryCount = 0; retryCount < DEFAULT_MAX_RETRIES && error == ErrorDisposition.RETRY; retryCount++) {
196201
Throwable throwable = null;
197202
try {
198-
getEndpoint().getCaller().arrayCall(callContext.getClient(), callContext, inputBatch);
203+
getEndpoint().getCaller().arrayCall(callContext.getClient(), callContext, inputHandles);
199204
incrementCallCount();
200205
return;
201206
} catch (Throwable catchedThrowable) {
@@ -210,7 +215,7 @@ private void processInput(CallContextImpl<I,O> callContext, I[] inputBatch) {
210215

211216
try {
212217
if (retryCount < DEFAULT_MAX_RETRIES - 1) {
213-
error = getErrorListener().processError(retryCount, throwable, callContext, inputBatch);
218+
error = getErrorListener().processError(retryCount, throwable, callContext, inputHandles);
214219
} else {
215220
error = ErrorDisposition.SKIP_CALL;
216221
}

marklogic-client-api/src/main/java/com/marklogic/client/dataservices/impl/InputOutputCallerImpl.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import com.marklogic.client.DatabaseClient;
2121
import com.marklogic.client.io.marker.BufferableContentHandle;
22+
import com.marklogic.client.io.marker.BufferableHandle;
2223
import com.marklogic.client.io.marker.JSONWriteHandle;
2324

2425
final public class InputOutputCallerImpl<I,O> extends IOCallerImpl<I,O> {
@@ -39,10 +40,7 @@ public InputOutputCallerImpl(
3940
}
4041
}
4142

42-
public O[] arrayCall(DatabaseClient db, CallContextImpl<I,O> callCtxt, I[] input) {
43+
public O[] arrayCall(DatabaseClient db, CallContextImpl<I,O> callCtxt, BufferableHandle[] input) {
4344
return responseMultipleAsArray(makeRequest(db, callCtxt, input), callCtxt);
4445
}
45-
public Stream<O> streamCall(DatabaseClient db, CallContextImpl<I,O> callCtxt, Stream<I> input) {
46-
return responseMultipleAsStream(makeRequest(db, callCtxt, input), callCtxt);
47-
}
4846
}

marklogic-client-api/src/main/java/com/marklogic/client/dataservices/impl/InputOutputEndpointImpl.java

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import com.marklogic.client.SessionState;
2020
import com.marklogic.client.dataservices.InputOutputCaller;
2121
import com.marklogic.client.io.marker.BufferableContentHandle;
22+
import com.marklogic.client.io.marker.BufferableHandle;
2223
import com.marklogic.client.io.marker.JSONWriteHandle;
2324
import org.slf4j.Logger;
2425
import org.slf4j.LoggerFactory;
@@ -100,7 +101,10 @@ public BulkInputOutputCaller<I,O> bulkCaller(CallContext[] callContexts, int thr
100101
}
101102

102103
private O[] getResponseData(CallContext callContext, I[] input) {
103-
return getCaller().arrayCall(getClient(), checkAllowedArgs(callContext), input);
104+
InputOutputCallerImpl<I,O> callerImpl = getCaller();
105+
return callerImpl.arrayCall(
106+
getClient(), checkAllowedArgs(callContext), callerImpl.getInputHandle().resendableHandleFor(input)
107+
);
104108
}
105109

106110
static public class BulkInputOutputCallerImpl<I,O> extends IOEndpointImpl.BulkIOEndpointCallerImpl<I,O>
@@ -110,7 +114,7 @@ static public class BulkInputOutputCallerImpl<I,O> extends IOEndpointImpl.BulkIO
110114
private final int batchSize;
111115
private final LinkedBlockingQueue<I> inputQueue;
112116
private Consumer<O> outputListener;
113-
private ErrorListener<I> errorListener;
117+
private ErrorListener errorListener;
114118

115119
public BulkInputOutputCallerImpl(InputOutputEndpointImpl<I,O> endpoint) {
116120
this(endpoint, endpoint.getBatchSize(), endpoint.checkAllowedArgs(endpoint.newCallContext()));
@@ -168,11 +172,11 @@ public void acceptAll(I[] input) {
168172
}
169173

170174
@Override
171-
public void setErrorListener(ErrorListener<I> errorListener) {
175+
public void setErrorListener(ErrorListener errorListener) {
172176
this.errorListener = errorListener;
173177
}
174178

175-
private ErrorListener<I> getErrorListener() {
179+
private ErrorListener getErrorListener() {
176180
return this.errorListener;
177181
}
178182

@@ -193,13 +197,16 @@ private void processInput() {
193197
private void processInput(CallContextImpl<I,O> callContext, I[] inputBatch) {
194198
logger.trace("input endpoint running endpoint={} count={} state={}", (callContext).getEndpoint().getEndpointPath(), getCallCount(),
195199
callContext.getEndpointState());
200+
InputOutputCallerImpl<I,O> callerImpl = getEndpoint().getCaller();
201+
196202
ErrorDisposition error = ErrorDisposition.RETRY;
197203

204+
BufferableHandle[] inputHandles = callerImpl.getInputHandle().resendableHandleFor(inputBatch);
198205
for (int retryCount = 0; retryCount < DEFAULT_MAX_RETRIES && error == ErrorDisposition.RETRY; retryCount++) {
199206
Throwable throwable = null;
200207
O[] output = null;
201208
try {
202-
output = getEndpoint().getCaller().arrayCall(callContext.getClient(), callContext, inputBatch);
209+
output = callerImpl.arrayCall(callContext.getClient(), callContext, inputHandles);
203210

204211
incrementCallCount();
205212
processOutputBatch(output, getOutputListener());
@@ -212,7 +219,7 @@ private void processInput(CallContextImpl<I,O> callContext, I[] inputBatch) {
212219
if (getErrorListener() != null) {
213220
try {
214221
if (retryCount < DEFAULT_MAX_RETRIES - 1) {
215-
error = getErrorListener().processError(retryCount, throwable, callContext, inputBatch);
222+
error = getErrorListener().processError(retryCount, throwable, callContext, inputHandles);
216223
} else {
217224
error = ErrorDisposition.SKIP_CALL;
218225
}

marklogic-client-api/src/main/java/com/marklogic/client/dataservices/impl/OutputCallerImpl.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,4 @@ public OutputCallerImpl(JSONWriteHandle apiDeclaration, BufferableContentHandle<
4040
public O[] arrayCall(DatabaseClient db, CallContextImpl<I,O> callCtxt) {
4141
return responseMultipleAsArray(makeRequest(db, callCtxt), callCtxt);
4242
}
43-
public Stream<O> streamCall(DatabaseClient db, CallContextImpl<I,O> callCtxt) {
44-
return responseMultipleAsStream(makeRequest(db, callCtxt), callCtxt);
45-
}
4643
}

marklogic-client-api/src/main/java/com/marklogic/client/io/marker/BufferableContentHandle.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,6 @@
1515
*/
1616
package com.marklogic.client.io.marker;
1717

18-
import com.marklogic.client.io.BytesHandle;
19-
2018
/**
2119
* A Bufferable Content Handle provides an adapter for a content
2220
* representation that can be read multiple times for purposes

marklogic-client-api/src/test/java/com/marklogic/client/test/dataservices/ErrorListenerExecEndpointTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -153,9 +153,9 @@ public void testBulkExecCallerImplWithStop() throws IOException {
153153
JsonNode finalState1 = docMgr.read(finalStateUri1, new JacksonHandle()).get();
154154
JsonNode finalState2 = docMgr.read(finalStateUri2, new JacksonHandle()).get();
155155
assertTrue("finalState1 is wrong, should be less than 15, but " + finalState1.get("state").get("next").asText(),
156-
Integer.valueOf(finalState1.get("state").get("next").asText()) < 15);
156+
finalState1.get("state").get("next").asInt() < 15);
157157
assertTrue("finalState2 is wrong, should be less than 26, but " + finalState2.get("state").get("next").asText(),
158-
Integer.valueOf(finalState2.get("state").get("next").asText()) <= 26);
158+
finalState2.get("state").get("next").asInt() <= 26);
159159
assertNotNull("null final state", finalState1);
160160
assertTrue("final state not object", finalState1.isObject());
161161
assertNotNull("null final state", finalState2);

0 commit comments

Comments
 (0)