Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 12 additions & 7 deletions src/java.base/share/classes/java/util/concurrent/ForkJoinTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -531,19 +531,22 @@ final void doExec() {
* still correct, although it may contain a misleading stack
* trace.
*
* @param asExecutionException true if wrap as ExecutionException
* @param asExecutionException true if wrap the result as an
* ExecutionException. This applies only to actual exceptions, not
* implicit CancellationExceptions issued when not THROWN or
* available, which are not wrapped because by default they are
* issued separately from ExecutionExceptions by callers. Which
* may require further handling when this is not true (currently
* only in InvokeAnyTask).
* @return the exception, or null if none
*/
private Throwable getException(boolean asExecutionException) {
int s; Throwable ex; Aux a;
if ((s = status) >= 0 || (s & ABNORMAL) == 0)
return null;
else if ((s & THROWN) == 0 || (a = aux) == null || (ex = a.ex) == null) {
ex = new CancellationException();
if (!asExecutionException || !(this instanceof InterruptibleTask))
return ex; // else wrap below
}
else if (a.thread != Thread.currentThread()) {
if ((s & THROWN) == 0 || (a = aux) == null || (ex = a.ex) == null)
return new CancellationException();
if (a.thread != Thread.currentThread()) {
try {
Constructor<?> noArgCtor = null, oneArgCtor = null;
for (Constructor<?> c : ex.getClass().getConstructors()) {
Expand Down Expand Up @@ -1814,6 +1817,8 @@ final T invokeAny(Collection<? extends Callable<T>> tasks,
(t = new InvokeAnyTask<T>(c, this, t)));
}
return timed ? get(nanos, TimeUnit.NANOSECONDS) : get();
} catch (CancellationException ce) {
throw new ExecutionException(ce);
} finally {
for (; t != null; t = t.pred)
t.onRootCompletion();
Expand Down
61 changes: 58 additions & 3 deletions test/jdk/java/util/concurrent/tck/ForkJoinPoolTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,14 @@
* http://creativecommons.org/publicdomain/zero/1.0/
*/

import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.NANOSECONDS;

import java.security.PrivilegedAction;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
Expand All @@ -51,13 +49,19 @@
import java.util.concurrent.Future;
import java.util.concurrent.RecursiveTask;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;

import junit.framework.Test;
import junit.framework.TestSuite;

import static java.util.concurrent.TimeUnit.*;

public class ForkJoinPoolTest extends JSR166TestCase {
public static void main(String[] args) {
main(suite(), args);
Expand Down Expand Up @@ -482,6 +486,57 @@ public void testSubmitRunnable() throws Throwable {
}
}

public void testCancellationExceptionInGet() throws Exception {
final ExecutorService e = new ForkJoinPool(1);
try (var cleaner = cleaner(e)) {
assertCancellationExceptionFrom(
e::submit,
f -> () -> f.get(1000, TimeUnit.SECONDS)
);
assertCancellationExceptionFrom(
e::submit,
f -> f::get
);
assertCancellationExceptionFrom(
c -> e.submit(() -> { try { c.call(); } catch (Exception ex) { throw new RuntimeException(ex); } }),
f -> () -> f.get(1000, TimeUnit.SECONDS)
);
assertCancellationExceptionFrom(
c -> e.submit(() -> { try { c.call(); } catch (Exception ex) { throw new RuntimeException(ex); } }),
f -> f::get
);
}
}

private void assertCancellationExceptionFrom(
Function<Callable<Void>, Future<?>> createTask,
Function<Future<?>, Callable<?>> getResult) throws Exception {
final var t = new AtomicReference<Thread>();
final var c = new CountDownLatch(1); // Only used to induce WAITING state (never counted down)
final var task = createTask.apply(() -> {
try {
t.set(Thread.currentThread());
c.await();
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();;
}
return null;
});
Thread taskThread;
while((taskThread = t.get()) == null || taskThread.getState() != Thread.State.WAITING) {
if (Thread.interrupted())
throw new InterruptedException();
Thread.onSpinWait();
}
task.cancel(true);
try {
getResult.apply(task).call();
} catch (CancellationException ce) {
return; // Success
}
shouldThrow();
}

/**
* Completed submit(runnable, result) returns result
*/
Expand Down