@@ -247,6 +247,14 @@ private void insertField(LuceneDocumentFromRecord.DocumentField field, final Doc
247247 }
248248 }
249249
250+ <M extends Message > void writeDocument (final FDBIndexableRecord <M > newRecord , final Map .Entry <Tuple , List <LuceneDocumentFromRecord .DocumentField >> entry , final Integer partitionId ) {
251+ try {
252+ writeDocument (entry .getValue (), entry .getKey (), partitionId , newRecord .getPrimaryKey ());
253+ } catch (IOException e ) {
254+ throw LuceneExceptions .toRecordCoreException ("Issue updating new index keys" , e , "newRecord" , newRecord .getPrimaryKey ());
255+ }
256+ }
257+
250258 @ SuppressWarnings ("PMD.CloseResource" )
251259 private void writeDocument (@ Nonnull List <LuceneDocumentFromRecord .DocumentField > fields ,
252260 Tuple groupingKey ,
@@ -473,30 +481,30 @@ <M extends Message> CompletableFuture<Void> update(@Nullable FDBIndexableRecord<
473481
474482 LOG .trace ("update oldFields={}, newFields{}" , oldRecordFields , newRecordFields );
475483
476- // delete old
477- return AsyncUtil . whenAll ( oldRecordFields . keySet (). stream (). map ( t -> {
478- try {
479- return tryDelete ( Objects . requireNonNull ( oldRecord ), t );
480- } catch ( IOException e ) {
481- throw LuceneExceptions . toRecordCoreException ( "Issue deleting" , e , "record" , Objects . requireNonNull ( oldRecord ). getPrimaryKey ());
482- }
483- }). collect ( Collectors . toList ())). thenCompose ( ignored ->
484- // update new
485- AsyncUtil . whenAll ( newRecordFields . entrySet (). stream (). map ( entry -> {
486- try {
487- return tryDeleteInWriteOnlyMode ( Objects . requireNonNull ( newRecord ), entry . getKey ()). thenCompose ( countDeleted ->
488- partitioner . addToAndSavePartitionMetadata ( newRecord , entry . getKey (), destinationPartitionIdHint ). thenApply ( partitionId -> {
489- try {
490- writeDocument ( entry . getValue (), entry . getKey (), partitionId , newRecord . getPrimaryKey ());
491- } catch ( IOException e ) {
492- throw LuceneExceptions . toRecordCoreException ( "Issue updating new index keys" , e , "newRecord" , newRecord . getPrimaryKey ());
493- }
494- return null ;
495- }));
496- } catch ( IOException e ) {
497- throw LuceneExceptions . toRecordCoreException ( "Issue updating" , e , "record" , Objects .requireNonNull (newRecord ). getPrimaryKey ());
498- }
499- }). collect ( Collectors . toList () )));
484+ return AsyncUtil . whenAll ( oldRecordFields . keySet (). stream ()
485+ // delete old
486+ . map ( groupingKey -> tryDelete ( Objects . requireNonNull ( oldRecord ), groupingKey ))
487+ . collect ( Collectors . toList ()))
488+ . thenCompose ( ignored ->
489+ // update new
490+ AsyncUtil . whenAll ( newRecordFields . entrySet (). stream ()
491+ . map ( entry -> updateRecord ( newRecord , destinationPartitionIdHint , entry ))
492+ . collect ( Collectors . toList ())));
493+ }
494+
495+ /**
496+ * Internal utility to update a single record.
497+ * @param newRecord the new record to save
498+ * @param destinationPartitionIdHint partition ID
499+ * @param entry entry from the grouping key to the document fields
500+ */
501+ private < M extends Message > CompletableFuture < Void > updateRecord (
502+ final FDBIndexableRecord < M > newRecord ,
503+ final Integer destinationPartitionIdHint ,
504+ final Map . Entry < Tuple , List < LuceneDocumentFromRecord . DocumentField >> entry ) {
505+ return tryDeleteInWriteOnlyMode ( Objects .requireNonNull (newRecord ), entry . getKey ()). thenCompose ( countDeleted ->
506+ partitioner . addToAndSavePartitionMetadata ( newRecord , entry . getKey (), destinationPartitionIdHint )
507+ . thenAccept ( partitionId -> writeDocument ( newRecord , entry , partitionId )));
500508 }
501509
502510 /**
@@ -509,10 +517,9 @@ <M extends Message> CompletableFuture<Void> update(@Nullable FDBIndexableRecord<
509517 * @param groupingKey grouping key
510518 * @param <M> message
511519 * @return count of deleted docs
512- * @throws IOException propagated by {@link #tryDelete(FDBIndexableRecord, Tuple)}
513520 */
514521 private <M extends Message > CompletableFuture <Integer > tryDeleteInWriteOnlyMode (@ Nonnull FDBIndexableRecord <M > record ,
515- @ Nonnull Tuple groupingKey ) throws IOException {
522+ @ Nonnull Tuple groupingKey ) {
516523 if (!state .store .isIndexWriteOnly (state .index )) {
517524 // no op
518525 return CompletableFuture .completedFuture (0 );
@@ -529,29 +536,35 @@ private <M extends Message> CompletableFuture<Integer> tryDeleteInWriteOnlyMode(
529536 * @param <M> record message
530537 * @return count of deleted docs: 1 indicates that the record has been deleted, 0 means that either no record was deleted or it was deleted by
531538 * query.
532- * @throws IOException propagated from {@link #deleteDocument(Tuple, Integer, Tuple)}
533539 */
534540 private <M extends Message > CompletableFuture <Integer > tryDelete (@ Nonnull FDBIndexableRecord <M > record ,
535- @ Nonnull Tuple groupingKey ) throws IOException {
536- // non-partitioned
537- if (!partitioner .isPartitioningEnabled ()) {
538- return CompletableFuture .completedFuture (deleteDocument (groupingKey , null , record .getPrimaryKey ()));
541+ @ Nonnull Tuple groupingKey ) {
542+ try {
543+ // non-partitioned
544+ if (!partitioner .isPartitioningEnabled ()) {
545+ return CompletableFuture .completedFuture (deleteDocument (groupingKey , null , record .getPrimaryKey ()));
546+ }
547+ } catch (IOException e ) {
548+ throw LuceneExceptions .toRecordCoreException ("Issue deleting" , e , "record" , Objects .requireNonNull (record ).getPrimaryKey ());
539549 }
540550
541551 // partitioned
542- return partitioner .tryGetPartitionInfo (record , groupingKey ).thenApply (partitionInfo -> {
543- if (partitionInfo != null ) {
544- try {
545- int countDeleted = deleteDocument (groupingKey , partitionInfo .getId (), record .getPrimaryKey ());
546- if (countDeleted > 0 ) {
547- partitioner .decrementCountAndSave (groupingKey , partitionInfo , countDeleted );
548- }
549- return countDeleted ;
550- } catch (IOException e ) {
551- throw LuceneExceptions .toRecordCoreException ("Issue deleting" , e , "record" , record .getPrimaryKey ());
552+ return partitioner .tryGetPartitionInfo (record , groupingKey ).thenCompose (partitionInfo -> {
553+ if (partitionInfo == null ) {
554+ return CompletableFuture .completedFuture (0 );
555+ }
556+ try {
557+ int countDeleted = deleteDocument (groupingKey , partitionInfo .getId (), record .getPrimaryKey ());
558+ // this might be 0 when in writeOnly mode, but otherwise should not happen.
559+ if (countDeleted > 0 ) {
560+ return partitioner .decrementCountAndSave (groupingKey , countDeleted , partitionInfo .getId ())
561+ .thenApply (vignore -> countDeleted );
562+ } else {
563+ return CompletableFuture .completedFuture (countDeleted );
552564 }
565+ } catch (IOException e ) {
566+ throw LuceneExceptions .toRecordCoreException ("Issue deleting" , e , "record" , record .getPrimaryKey ());
553567 }
554- return 0 ;
555568 });
556569 }
557570
0 commit comments