diff --git a/src/java.base/share/classes/java/util/concurrent/ForkJoinTask.java b/src/java.base/share/classes/java/util/concurrent/ForkJoinTask.java index 51b2488264e..137cac45ed0 100644 --- a/src/java.base/share/classes/java/util/concurrent/ForkJoinTask.java +++ b/src/java.base/share/classes/java/util/concurrent/ForkJoinTask.java @@ -531,19 +531,22 @@ final void doExec() { * still correct, although it may contain a misleading stack * trace. * - * @param asExecutionException true if wrap as ExecutionException + * @param asExecutionException true if wrap the result as an + * ExecutionException. This applies only to actual exceptions, not + * implicit CancellationExceptions issued when not THROWN or + * available, which are not wrapped because by default they are + * issued separately from ExecutionExceptions by callers. Which + * may require further handling when this is not true (currently + * only in InvokeAnyTask). * @return the exception, or null if none */ private Throwable getException(boolean asExecutionException) { int s; Throwable ex; Aux a; if ((s = status) >= 0 || (s & ABNORMAL) == 0) return null; - else if ((s & THROWN) == 0 || (a = aux) == null || (ex = a.ex) == null) { - ex = new CancellationException(); - if (!asExecutionException || !(this instanceof InterruptibleTask)) - return ex; // else wrap below - } - else if (a.thread != Thread.currentThread()) { + if ((s & THROWN) == 0 || (a = aux) == null || (ex = a.ex) == null) + return new CancellationException(); + if (a.thread != Thread.currentThread()) { try { Constructor noArgCtor = null, oneArgCtor = null; for (Constructor c : ex.getClass().getConstructors()) { @@ -1814,6 +1817,8 @@ final T invokeAny(Collection> tasks, (t = new InvokeAnyTask(c, this, t))); } return timed ? get(nanos, TimeUnit.NANOSECONDS) : get(); + } catch (CancellationException ce) { + throw new ExecutionException(ce); } finally { for (; t != null; t = t.pred) t.onRootCompletion(); diff --git a/test/jdk/java/util/concurrent/tck/ForkJoinPoolTest.java b/test/jdk/java/util/concurrent/tck/ForkJoinPoolTest.java index 9422128511e..de1caaab2d9 100644 --- a/test/jdk/java/util/concurrent/tck/ForkJoinPoolTest.java +++ b/test/jdk/java/util/concurrent/tck/ForkJoinPoolTest.java @@ -31,9 +31,6 @@ * http://creativecommons.org/publicdomain/zero/1.0/ */ -import static java.util.concurrent.TimeUnit.MILLISECONDS; -import static java.util.concurrent.TimeUnit.NANOSECONDS; - import java.security.PrivilegedAction; import java.security.PrivilegedExceptionAction; import java.util.ArrayList; @@ -41,6 +38,7 @@ import java.util.Collections; import java.util.List; import java.util.concurrent.Callable; +import java.util.concurrent.CancellationException; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; @@ -51,13 +49,19 @@ import java.util.concurrent.Future; import java.util.concurrent.RecursiveTask; import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.ReentrantLock; import junit.framework.Test; import junit.framework.TestSuite; +import static java.util.concurrent.TimeUnit.*; + public class ForkJoinPoolTest extends JSR166TestCase { public static void main(String[] args) { main(suite(), args); @@ -482,6 +486,57 @@ public void testSubmitRunnable() throws Throwable { } } + public void testCancellationExceptionInGet() throws Exception { + final ExecutorService e = new ForkJoinPool(1); + try (var cleaner = cleaner(e)) { + assertCancellationExceptionFrom( + e::submit, + f -> () -> f.get(1000, TimeUnit.SECONDS) + ); + assertCancellationExceptionFrom( + e::submit, + f -> f::get + ); + assertCancellationExceptionFrom( + c -> e.submit(() -> { try { c.call(); } catch (Exception ex) { throw new RuntimeException(ex); } }), + f -> () -> f.get(1000, TimeUnit.SECONDS) + ); + assertCancellationExceptionFrom( + c -> e.submit(() -> { try { c.call(); } catch (Exception ex) { throw new RuntimeException(ex); } }), + f -> f::get + ); + } + } + + private void assertCancellationExceptionFrom( + Function, Future> createTask, + Function, Callable> getResult) throws Exception { + final var t = new AtomicReference(); + final var c = new CountDownLatch(1); // Only used to induce WAITING state (never counted down) + final var task = createTask.apply(() -> { + try { + t.set(Thread.currentThread()); + c.await(); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt();; + } + return null; + }); + Thread taskThread; + while((taskThread = t.get()) == null || taskThread.getState() != Thread.State.WAITING) { + if (Thread.interrupted()) + throw new InterruptedException(); + Thread.onSpinWait(); + } + task.cancel(true); + try { + getResult.apply(task).call(); + } catch (CancellationException ce) { + return; // Success + } + shouldThrow(); + } + /** * Completed submit(runnable, result) returns result */