|
23 | 23 | import com.apple.foundationdb.annotation.API; |
24 | 24 | import com.apple.foundationdb.util.LoggableException; |
25 | 25 | import com.google.common.base.Suppliers; |
| 26 | +import com.google.common.collect.Lists; |
26 | 27 | import com.google.common.util.concurrent.ThreadFactoryBuilder; |
27 | 28 |
|
28 | 29 | import javax.annotation.Nonnull; |
29 | 30 | import javax.annotation.Nullable; |
30 | 31 | import java.util.ArrayDeque; |
31 | 32 | import java.util.ArrayList; |
| 33 | +import java.util.Arrays; |
32 | 34 | import java.util.Collections; |
33 | 35 | import java.util.Iterator; |
34 | 36 | import java.util.List; |
|
42 | 44 | import java.util.concurrent.ScheduledThreadPoolExecutor; |
43 | 45 | import java.util.concurrent.ThreadFactory; |
44 | 46 | import java.util.concurrent.TimeUnit; |
| 47 | +import java.util.concurrent.atomic.AtomicInteger; |
| 48 | +import java.util.concurrent.atomic.AtomicReference; |
45 | 49 | import java.util.function.BiConsumer; |
46 | 50 | import java.util.function.BiFunction; |
47 | 51 | import java.util.function.Function; |
| 52 | +import java.util.function.IntPredicate; |
| 53 | +import java.util.function.IntUnaryOperator; |
48 | 54 | import java.util.function.Predicate; |
49 | 55 | import java.util.function.Supplier; |
50 | 56 |
|
@@ -1051,6 +1057,93 @@ public static CompletableFuture<Void> swallowException(@Nonnull CompletableFutur |
1051 | 1057 | return result; |
1052 | 1058 | } |
1053 | 1059 |
|
| 1060 | + /** |
| 1061 | + * Method that provides the functionality of a for loop, however, in an asynchronous way. The result of this method |
| 1062 | + * is a {@link CompletableFuture} that represents the result of the last iteration of the loop body. |
| 1063 | + * @param startI an integer analogous to the starting value of a loop variable in a for loop |
| 1064 | + * @param startU an object of some type {@code U} that represents some initial state that is passed to the loop's |
| 1065 | + * initial state |
| 1066 | + * @param conditionPredicate a predicate on the loop variable that must be true before the next iteration is |
| 1067 | + * entered; analogous to the condition in a for loop |
| 1068 | + * @param stepFunction a unary operator used for modifying the loop variable after each iteration |
| 1069 | + * @param body a bi-function to be called for each iteration; this function is initially invoked using |
| 1070 | + * {@code startI} and {@code startU}; the result of the body is then passed into the next iterator's body |
| 1071 | + * together with a new value for the loop variable. In this way callers can access state inside an iteration |
| 1072 | + * that was computed in a previous iteration. |
| 1073 | + * @param executor the executor |
| 1074 | + * @param <U> the type of the result of the body {@link BiFunction} |
| 1075 | + * @return a {@link CompletableFuture} containing the result of the last iteration's body invocation. |
| 1076 | + */ |
| 1077 | + @Nonnull |
| 1078 | + public static <U> CompletableFuture<U> forLoop(final int startI, @Nullable final U startU, |
| 1079 | + @Nonnull final IntPredicate conditionPredicate, |
| 1080 | + @Nonnull final IntUnaryOperator stepFunction, |
| 1081 | + @Nonnull final BiFunction<Integer, U, CompletableFuture<U>> body, |
| 1082 | + @Nonnull final Executor executor) { |
| 1083 | + final AtomicInteger loopVariableAtomic = new AtomicInteger(startI); |
| 1084 | + final AtomicReference<U> lastResultAtomic = new AtomicReference<>(startU); |
| 1085 | + return whileTrue(() -> { |
| 1086 | + final int loopVariable = loopVariableAtomic.get(); |
| 1087 | + if (!conditionPredicate.test(loopVariable)) { |
| 1088 | + return AsyncUtil.READY_FALSE; |
| 1089 | + } |
| 1090 | + return body.apply(loopVariable, lastResultAtomic.get()) |
| 1091 | + .thenApply(result -> { |
| 1092 | + loopVariableAtomic.set(stepFunction.applyAsInt(loopVariable)); |
| 1093 | + lastResultAtomic.set(result); |
| 1094 | + return true; |
| 1095 | + }); |
| 1096 | + }, executor).thenApply(ignored -> lastResultAtomic.get()); |
| 1097 | + } |
| 1098 | + |
| 1099 | + /** |
| 1100 | + * Method to iterate over some items, for each of which a body is executed asynchronously. The result of each such |
| 1101 | + * executed is then collected in a list and returned as a {@link CompletableFuture} over that list. |
| 1102 | + * @param items the items to iterate over |
| 1103 | + * @param body a function to be called for each item |
| 1104 | + * @param parallelism the maximum degree of parallelism this method should use |
| 1105 | + * @param executor the executor |
| 1106 | + * @param <T> the type of item |
| 1107 | + * @param <U> the type of the result |
| 1108 | + * @return a {@link CompletableFuture} containing a list of results collected from the individual body invocations |
| 1109 | + */ |
| 1110 | + @Nonnull |
| 1111 | + @SuppressWarnings("unchecked") |
| 1112 | + public static <T, U> CompletableFuture<List<U>> forEach(@Nonnull final Iterable<T> items, |
| 1113 | + @Nonnull final Function<T, CompletableFuture<U>> body, |
| 1114 | + final int parallelism, |
| 1115 | + @Nonnull final Executor executor) { |
| 1116 | + // this deque is only modified by once upon creation |
| 1117 | + final ArrayDeque<T> toBeProcessed = new ArrayDeque<>(); |
| 1118 | + for (final T item : items) { |
| 1119 | + toBeProcessed.addLast(item); |
| 1120 | + } |
| 1121 | + |
| 1122 | + final List<CompletableFuture<Void>> working = Lists.newArrayList(); |
| 1123 | + final AtomicInteger indexAtomic = new AtomicInteger(0); |
| 1124 | + final Object[] resultArray = new Object[toBeProcessed.size()]; |
| 1125 | + |
| 1126 | + return whileTrue(() -> { |
| 1127 | + working.removeIf(CompletableFuture::isDone); |
| 1128 | + |
| 1129 | + while (working.size() <= parallelism) { |
| 1130 | + final T currentItem = toBeProcessed.pollFirst(); |
| 1131 | + if (currentItem == null) { |
| 1132 | + break; |
| 1133 | + } |
| 1134 | + |
| 1135 | + final int index = indexAtomic.getAndIncrement(); |
| 1136 | + working.add(body.apply(currentItem) |
| 1137 | + .thenAccept(result -> resultArray[index] = result)); |
| 1138 | + } |
| 1139 | + |
| 1140 | + if (working.isEmpty()) { |
| 1141 | + return AsyncUtil.READY_FALSE; |
| 1142 | + } |
| 1143 | + return whenAny(working).thenApply(ignored -> true); |
| 1144 | + }, executor).thenApply(ignored -> Arrays.asList((U[])resultArray)); |
| 1145 | + } |
| 1146 | + |
1054 | 1147 | /** |
1055 | 1148 | * A {@code Boolean} function that is always true. |
1056 | 1149 | * @param <T> the type of the (ignored) argument to the function |
|
0 commit comments