Skip to content

Commit f3d41e8

Browse files
#838 - Calling retry with failure listeners rather than a single retry
1 parent 37aeb24 commit f3d41e8

File tree

6 files changed

+58
-11
lines changed

6 files changed

+58
-11
lines changed

src/main/java/com/marklogic/client/datamovement/HostAvailabilityListener.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -182,7 +182,7 @@ public void processFailure(WriteBatch batch, Throwable throwable) {
182182
logger.warn("Retrying failed batch: {}, results so far: {}, uris: {}",
183183
batch.getJobBatchNumber(), batch.getJobWritesSoFar(),
184184
Stream.of(batch.getItems()).map(event->event.getTargetUri()).collect(Collectors.toList()));
185-
batch.getBatcher().retry(batch);
185+
batch.getBatcher().retryWithFailureListeners(batch);
186186
} catch (RuntimeException e) {
187187
logger.error("Exception during retry", e);
188188
processFailure(batch, e);
@@ -202,7 +202,7 @@ public void processFailure(QueryBatchException queryBatch) {
202202
logger.warn("Retrying failed batch: {}, results so far: {}, forest: {}, forestBatch: {}, forest results so far: {}",
203203
queryBatch.getJobBatchNumber(), queryBatch.getJobResultsSoFar(), queryBatch.getForest().getForestName(),
204204
queryBatch.getForestBatchNumber(), queryBatch.getForestResultsSoFar());
205-
queryBatch.getBatcher().retry(queryBatch);
205+
queryBatch.getBatcher().retryWithFailureListeners(queryBatch);
206206
} catch (RuntimeException e) {
207207
logger.error("Exception during retry", e);
208208
processFailure(new QueryBatchException(queryBatch, e));

src/main/java/com/marklogic/client/datamovement/QueryBatcher.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -310,4 +310,18 @@ public interface QueryBatcher extends Batcher {
310310
* @param queryBatchListener the QueryBatchListener which needs to be applied
311311
*/
312312
void retryListener(QueryBatch batch, QueryBatchListener queryBatchListener);
313+
314+
/**
315+
* Retry in the same thread to query a batch that failed. If it fails again,
316+
* all the failure listeners associated with the batcher using onQueryFailure
317+
* method would be processed.
318+
*
319+
* Note : Use this method with caution as there is a possibility of infinite
320+
* loops. If a batch fails and one of the failure listeners calls this method
321+
* to retry with failure listeners and if the batch again fails, this would go
322+
* on as an infinite loop until the batch succeeds.
323+
*
324+
* @param queryEvent the information about the batch that failed
325+
*/
326+
void retryWithFailureListeners(QueryEvent queryEvent);
313327
}

src/main/java/com/marklogic/client/datamovement/WriteBatcher.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -321,4 +321,18 @@ WriteBatcher addAs(String uri, DocumentMetadataWriteHandle metadataHandle,
321321
* @throws IllegalStateException if this job has not yet been started
322322
*/
323323
JobTicket getJobTicket();
324+
325+
/**
326+
* Retry in the same thread to send a batch that failed. If it fails again,
327+
* all the failure listeners associated with the batcher using onBatchFailure
328+
* method would be processed.
329+
*
330+
* Note : Use this method with caution as there is a possibility of infinite
331+
* loops. If a batch fails and one of the failure listeners calls this method
332+
* to retry with failure listeners and if the batch again fails, this would go
333+
* on as an infinite loop until the batch succeeds.
334+
*
335+
* @param queryEvent the information about the batch that failed
336+
*/
337+
public void retryWithFailureListeners(WriteBatch writeBatch);
324338
}

src/main/java/com/marklogic/client/datamovement/impl/DataMovementManagerImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@
5151
public class DataMovementManagerImpl implements DataMovementManager {
5252
private static Logger logger = LoggerFactory.getLogger(DataMovementManager.class);
5353
private DataMovementServices service = new DataMovementServices();
54-
private ConcurrentHashMap<String, JobTicket> activeJobs = new ConcurrentHashMap<>();
54+
private static ConcurrentHashMap<String, JobTicket> activeJobs = new ConcurrentHashMap<>();
5555
private ForestConfiguration forestConfig;
5656
private DatabaseClient primaryClient;
5757
// clientMap key is the hostname_database

src/main/java/com/marklogic/client/datamovement/impl/QueryBatcherImpl.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -124,11 +124,19 @@ public QueryBatcherImpl onQueryFailure(QueryFailureListener listener) {
124124
*/
125125
@Override
126126
public void retry(QueryEvent queryEvent) {
127+
retry(queryEvent, false);
128+
}
129+
130+
@Override
131+
public void retryWithFailureListeners(QueryEvent queryEvent) {
132+
retry(queryEvent, true);
133+
}
134+
135+
private void retry(QueryEvent queryEvent, boolean callFailListeners) {
127136
if ( isStopped() == true ) {
128137
logger.warn("Job is now stopped, aborting the retry");
129138
return;
130139
}
131-
boolean callFailListeners = false;
132140
Forest retryForest = null;
133141
for ( Forest forest : getForestConfig().listForests() ) {
134142
if ( forest.equals(queryEvent.getForest()) ) {
@@ -151,7 +159,6 @@ public void retry(QueryEvent queryEvent) {
151159
queryEvent.getForestBatchNumber(), start, queryEvent.getJobBatchNumber(), callFailListeners);
152160
runnable.run();
153161
}
154-
155162
/*
156163
* Accepts a QueryBatch which was successfully retrieved from the server and a
157164
* QueryBatchListener which was failed to apply and retry that listener on the batch.

src/main/java/com/marklogic/client/datamovement/impl/WriteBatcherImpl.java

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -461,26 +461,38 @@ public WriteBatcher onBatchFailure(WriteFailureListener listener) {
461461
return this;
462462
}
463463

464+
@Override
465+
public void retryWithFailureListeners(WriteBatch batch) {
466+
retry(batch, true);
467+
}
468+
464469
@Override
465470
public void retry(WriteBatch batch) {
471+
retry(batch, false);
472+
}
473+
474+
private void retry(WriteBatch batch, boolean callFailListeners) {
466475
if ( isStopped() == true ) {
467476
logger.warn("Job is now stopped, aborting the retry");
468477
return;
469478
}
470479
if ( batch == null ) throw new IllegalArgumentException("batch must not be null");
471480
boolean forceNewTransaction = true;
472481
BatchWriteSet writeSet = newBatchWriteSet(forceNewTransaction, batch.getJobBatchNumber());
473-
writeSet.onFailure(throwable -> {
474-
if ( throwable instanceof RuntimeException ) throw (RuntimeException) throwable;
475-
else throw new DataMovementException("Failed to retry batch", throwable);
476-
});
477-
for ( WriteEvent doc : batch.getItems() ) {
482+
if ( !callFailListeners ) {
483+
writeSet.onFailure(throwable -> {
484+
if ( throwable instanceof RuntimeException )
485+
throw (RuntimeException) throwable;
486+
else
487+
throw new DataMovementException("Failed to retry batch", throwable);
488+
});
489+
}
490+
for (WriteEvent doc : batch.getItems()) {
478491
writeSet.getWriteSet().add(doc.getTargetUri(), doc.getMetadata(), doc.getContent());
479492
}
480493
BatchWriter runnable = new BatchWriter(writeSet);
481494
runnable.run();
482495
}
483-
484496
@Override
485497
public WriteBatchListener[] getBatchSuccessListeners() {
486498
return successListeners.toArray(new WriteBatchListener[successListeners.size()]);

0 commit comments

Comments
 (0)