1- /*
2- * Copyright 2016 the original author or authors.
3- *
4- * Licensed under the Apache License, Version 2.0 (the "License");
5- * you may not use this file except in compliance with the License.
6- * You may obtain a copy of the License at
7- *
8- * http://www.apache.org/licenses/LICENSE-2.0
9- *
10- * Unless required by applicable law or agreed to in writing, software
11- * distributed under the License is distributed on an "AS IS" BASIS,
12- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13- * See the License for the specific language governing permissions and
14- * limitations under the License
15- */
161package net .jodah .failsafe ;
172
18- import net .jodah .failsafe .internal .util .Assert ;
19- import net .jodah .failsafe .util .concurrent .Scheduler ;
20-
21- import java .util .List ;
223import java .util .concurrent .CompletableFuture ;
23- import java .util .concurrent .CompletionException ;
24- import java .util .concurrent .Future ;
25- import java .util .function .Function ;
264
275/**
28- * Tracks asynchronous executions and handles failures according to one or more {@link Policy policies}. Execution
29- * results must be explicitly recorded via one of the {@code record} methods.
6+ * Allows asynchronous executions to record their results or complete an execution.
307 *
318 * @param <R> result type
329 * @author Jonathan Halterman
3310 */
34- public final class AsyncExecution <R > extends AbstractExecution <R > {
35- // Cross-attempt state --
36- // The outer-most function that executions begin with
37- private Function <AsyncExecution <R >, CompletableFuture <ExecutionResult >> outerFn ;
38- private final boolean asyncExecution ;
39- final FailsafeFuture <R > future ;
40-
41- // Per-attempt state --
42- // Whether a result has been explicitly recorded
43- volatile boolean recordCalled ;
44- // The future for the thread that the innerFn is running in
45- volatile Future <?> innerFuture ;
46- // Whether a policy executor completed post execution
47- private final boolean [] policyPostExecuted ;
48-
49- AsyncExecution (List <Policy <R >> policies , Scheduler scheduler , FailsafeFuture <R > future , boolean asyncExecution ,
50- Function <AsyncExecution <R >, CompletableFuture <ExecutionResult >> innerFn ) {
51- super (policies , scheduler );
52- this .future = future ;
53- this .asyncExecution = asyncExecution ;
54- this .policyPostExecuted = new boolean [policyExecutors .size ()];
55-
56- outerFn = asyncExecution ? Functions .toExecutionAware (innerFn ) : innerFn ;
57- outerFn = Functions .toAsync (outerFn , scheduler );
58-
59- for (PolicyExecutor <R , ? extends Policy <R >> policyExecutor : policyExecutors )
60- outerFn = policyExecutor .applyAsync (outerFn , scheduler , this .future );
61- }
62-
63- private AsyncExecution (AsyncExecution <R > execution ) {
64- super (execution );
65- outerFn = execution .outerFn ;
66- future = execution .future ;
67- asyncExecution = execution .asyncExecution ;
68- policyPostExecuted = new boolean [policyExecutors .size ()];
69- }
70-
11+ public interface AsyncExecution <R > extends ExecutionContext <R > {
7112 /**
7213 * Completes the execution and the associated {@code CompletableFuture}.
7314 *
74- * @throws IllegalStateException if the execution is already complete
15+ * @throws IllegalStateException if the execution is already recorded or complete
7516 */
76- public void complete () {
77- Assert .state (!recordCalled , "The most recent execution has already been recorded" );
78- recordCalled = true ;
17+ void complete ();
7918
80- // Guard against race with a timeout expiring
81- synchronized (future ) {
82- ExecutionResult result = this .result != null ? this .result : ExecutionResult .NONE ;
83- complete (postExecute (result ), null );
84- }
85- }
19+ /**
20+ * Returns whether the execution is complete or if it can be retried. An execution is considered complete only when
21+ * all configured policies consider the execution complete.
22+ */
23+ boolean isComplete ();
8624
8725 /**
8826 * Records an execution {@code result} or {@code failure} which triggers failure handling, if needed, by the
@@ -91,78 +29,21 @@ public void complete() {
9129 *
9230 * @throws IllegalStateException if the most recent execution was already recorded or the execution is complete
9331 */
94- public void record (R result , Throwable failure ) {
95- Assert .state (!recordCalled , "The most recent execution has already been recorded" );
96- recordCalled = true ;
97-
98- // Guard against race with a timeout expiring
99- synchronized (future ) {
100- if (!attemptRecorded ) {
101- ExecutionResult er = new ExecutionResult (result , failure ).withDelay (delayNanos );
102- record (er );
103- }
104-
105- // Proceed with handling the recorded result
106- executeAsync ();
107- }
108- }
32+ void record (R result , Throwable failure );
10933
11034 /**
11135 * Records an execution {@code result} which triggers failure handling, if needed, by the configured policies. If
11236 * policy handling is not possible or already complete, the resulting {@link CompletableFuture} is completed.
11337 *
11438 * @throws IllegalStateException if the most recent execution was already recorded or the execution is complete
11539 */
116- public void recordResult (R result ) {
117- record (result , null );
118- }
40+ void recordResult (R result );
11941
12042 /**
12143 * Records an execution {@code failure} which triggers failure handling, if needed, by the configured policies. If
12244 * policy handling is not possible or already complete, the resulting {@link CompletableFuture} is completed.
12345 *
12446 * @throws IllegalStateException if the most recent execution was already recorded or the execution is complete
12547 */
126- public void recordFailure (Throwable failure ) {
127- record (null , failure );
128- }
129-
130- /**
131- * Performs an asynchronous execution.
132- */
133- void executeAsync () {
134- outerFn .apply (this ).whenComplete (this ::complete );
135- }
136-
137- private void complete (ExecutionResult result , Throwable error ) {
138- if (result == null && error == null )
139- return ;
140-
141- completed = true ;
142- if (!future .isDone ()) {
143- if (result != null )
144- future .completeResult (result );
145- else {
146- if (error instanceof CompletionException )
147- error = error .getCause ();
148- future .completeResult (ExecutionResult .failure (error ));
149- }
150- }
151- }
152-
153- synchronized void setPostExecuted (int policyIndex ) {
154- policyPostExecuted [policyIndex ] = true ;
155- }
156-
157- synchronized boolean isPostExecuted (int policyIndex ) {
158- return policyPostExecuted [policyIndex ];
159- }
160-
161- boolean isAsyncExecution () {
162- return asyncExecution ;
163- }
164-
165- AsyncExecution <R > copy () {
166- return new AsyncExecution <>(this );
167- }
168- }
48+ void recordFailure (Throwable failure );
49+ }
0 commit comments