Skip to content

Commit 17c6b70

Browse files
committed
promise/future模式
1 parent da5d934 commit 17c6b70

File tree

6 files changed

+73
-110
lines changed

6 files changed

+73
-110
lines changed

src/main/java/code/concurrency/promise/AbstractFuture.java

Lines changed: 36 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import java.util.concurrent.atomic.AtomicReference;
88
import java.util.concurrent.locks.Lock;
99
import java.util.concurrent.locks.ReentrantLock;
10+
import java.util.function.BiConsumer;
1011

1112
/**
1213
* 〈AbstractFuture〉<p>
@@ -31,36 +32,34 @@ public abstract class AbstractFuture<R> implements Future<R> {
3132
protected AtomicReference<Callback> callbackReference = new AtomicReference<>();
3233

3334
@Override
34-
public boolean isDone(){
35-
return done;
35+
public boolean isDone() {
36+
return done;
3637
}
3738

3839
@Override
39-
public boolean isSuccess(){
40+
public boolean isSuccess() {
4041
return isDone() && result != null;
4142
}
4243

4344
@Override
44-
public boolean isCancelled(){
45+
public boolean isCancelled() {
4546
return isDone() && cancelled;
4647
}
4748

4849
@Override
49-
public boolean cancel(){
50+
public boolean cancel() {
5051
throw new UnsupportedOperationException();
5152
}
5253

5354
@Override
5455
public R get() throws InterruptedException, ExecutionException {
55-
if(!isDone()){
56+
if (!isDone()) {
5657
latch.await();
5758
}
5859

59-
if(isSuccess()){
60+
if (isSuccess()) {
6061
return result;
61-
}
62-
63-
if(isCancelled()){
62+
} else if (isCancelled()) {
6463
throw new ExecutionException("Task has been cancelled", null);
6564
}
6665

@@ -69,27 +68,45 @@ public R get() throws InterruptedException, ExecutionException {
6968

7069
@Override
7170
public R get(long timeout, TimeUnit unit)
72-
throws InterruptedException, ExecutionException, TimeoutException{
73-
if(!isDone()){
74-
if(latch.await(timeout, unit) && !isDone()){
71+
throws InterruptedException, ExecutionException, TimeoutException {
72+
if (!isDone()) {
73+
if (latch.await(timeout, unit) && !isDone()) {
7574
throw new TimeoutException("Get result timeout");
7675
}
7776
}
7877

79-
if(isSuccess()){
78+
if (isSuccess()) {
8079
return result;
81-
}
82-
83-
if(isCancelled()){
80+
} else if (isCancelled()) {
8481
throw new ExecutionException("Task has been cancelled", null);
8582
}
8683

8784
throw new ExecutionException(cause);
8885
}
8986

90-
protected void executeCallbackOnce(){
87+
@Override
88+
public Future<R> whenComplete(BiConsumer<? super R, ? super Throwable> callback) {
89+
callbackReference.compareAndSet(null, () -> callback.accept(this.result, this.cause));
90+
91+
// 执行时间极端或线程池满的情况下,放入callback前已经完成,此处需要主动执行回调
92+
if (isDone()) {
93+
executeOnlyOnce();
94+
}
95+
return this;
96+
}
97+
98+
@Override
99+
public void setDone() {
100+
this.done = true;
101+
latch.countDown();
102+
103+
// 精确执行一次回调
104+
executeOnlyOnce();
105+
}
106+
107+
private void executeOnlyOnce() {
91108
Callback callback = callbackReference.getAndSet(null);
92-
if(callback != null){
109+
if (callback != null) {
93110
callback.call();
94111
}
95112
}

src/main/java/code/concurrency/promise/DefaultFuture.java

Lines changed: 0 additions & 39 deletions
This file was deleted.

src/main/java/code/concurrency/promise/DefaultPromise.java

Lines changed: 12 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
* @author zixiao
88
* @date 2019/6/11
99
*/
10-
public class DefaultPromise<R> extends DefaultFuture<R> implements Promise<R> {
10+
public class DefaultPromise<R> extends AbstractFuture<R> implements Promise<R> {
1111

1212
@Override
1313
public Future<R> getFuture() {
@@ -16,69 +16,60 @@ public Future<R> getFuture() {
1616

1717
@Override
1818
public Promise<R> setSuccess(R result) throws IllegalStateException {
19-
if(trySuccess(result)){
19+
if (trySuccess(result)) {
2020
return this;
2121
}
2222
throw new IllegalStateException("Set success exception.");
2323
}
2424

2525
@Override
2626
public boolean trySuccess(R result) {
27-
if(result == null){
27+
if (result == null) {
2828
throw new NullPointerException("Result can not be null");
2929
}
30-
if(isDone()){
30+
if (isDone()) {
3131
return false;
3232
}
3333

3434
doneLock.lock();
3535
try {
36-
if(!isDone()){
36+
if (!isDone()) {
3737
this.result = result;
3838
setDone();
3939
return true;
4040
}
41-
}finally {
41+
} finally {
4242
doneLock.unlock();
4343
}
4444
return false;
4545
}
4646

47-
/**
48-
* 设置为完成
49-
*/
50-
private void setDone(){
51-
this.done = true;
52-
latch.countDown();
53-
54-
executeCallbackOnce();
55-
}
56-
5747
@Override
5848
public Promise<R> setFailure(Throwable cause) {
59-
if(tryFailure(cause)){
49+
if (tryFailure(cause)) {
6050
return this;
6151
}
6252
throw new IllegalStateException("Set failure exception.");
6353
}
6454

6555
@Override
6656
public boolean tryFailure(Throwable cause) {
67-
if(cause == null){
57+
if (cause == null) {
6858
throw new NullPointerException("Cause can not be null");
6959
}
70-
if(isDone()){
60+
61+
if (isDone()) {
7162
return false;
7263
}
7364

7465
doneLock.lock();
7566
try {
76-
if(!isDone()){
67+
if (!isDone()) {
7768
this.cause = cause;
7869
setDone();
7970
return true;
8071
}
81-
}finally {
72+
} finally {
8273
doneLock.unlock();
8374
}
8475
return false;

src/main/java/code/concurrency/promise/DefaultPromisor.java

Lines changed: 12 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,20 @@ public DefaultPromisor(ExecutorService executorService) {
2121
* 启动异步任务的执行,并返回用于获取异步任务执行结果的凭据对象
2222
*/
2323
@Override
24-
public <R> Future<R> asyncExecute(Callable<R> callable){
25-
if(callable == null){
24+
public <R> Future<R> asyncExecute(Callable<R> callable) {
25+
if (callable == null) {
2626
throw new NullPointerException("Callable can not be null");
2727
}
2828
Promise<R> promise = new DefaultPromise<>();
2929
try {
30-
executorService.execute(new PromiseTask<>(promise, callable));
30+
executorService.execute(() -> {
31+
try {
32+
R r = callable.call();
33+
promise.trySuccess(r);
34+
} catch (Exception e) {
35+
promise.tryFailure(e);
36+
}
37+
});
3138
} catch (RejectedExecutionException e) {
3239
promise.setFailure(e);
3340
}
@@ -40,35 +47,13 @@ public <R> R execute(Callable<R> callable) throws ExecutionException, Interrupte
4047
return future.get();
4148
}
4249

43-
private static class PromiseTask<R> implements Runnable{
44-
45-
private Promise<R> promise;
46-
47-
private Callable<R> callable;
48-
49-
public PromiseTask(Promise<R> promise, Callable<R> callable) {
50-
this.promise = promise;
51-
this.callable = callable;
52-
}
53-
54-
@Override
55-
public void run(){
56-
try {
57-
R r = callable.call();
58-
promise.trySuccess(r);
59-
} catch (Exception e) {
60-
promise.tryFailure(e);
61-
}
62-
}
63-
}
64-
6550
@Override
66-
public void shutdown(long time, TimeUnit unit){
51+
public void shutdown(long time, TimeUnit unit) {
6752
executorService.shutdown();
6853
try {
6954
executorService.awaitTermination(10, TimeUnit.SECONDS);
7055
} catch (InterruptedException e) {
71-
e.printStackTrace();
56+
Thread.currentThread().interrupt();
7257
}
7358
}
7459

src/main/java/code/concurrency/promise/Future.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,16 @@ public interface Future <R> {
5656
*/
5757
R get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
5858

59+
/**
60+
* 完成回调
61+
* @param callback
62+
* @return
63+
*/
5964
Future<R> whenComplete(BiConsumer<? super R, ? super Throwable> callback);
6065

66+
/**
67+
* 设置完成
68+
*/
69+
void setDone();
70+
6171
}

src/main/java/code/concurrency/promise/PromiseTest.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
package code.concurrency.promise;
22

3-
import code.util.ThreadPool;
43
import org.junit.Test;
54

65
import java.text.MessageFormat;
@@ -15,13 +14,13 @@
1514
*/
1615
public class PromiseTest {
1716

18-
private ExecutorService executorService = ThreadPool.create(100, 100, 1024);
17+
private ExecutorService executorService = new ThreadPoolExecutor(64, 64, 5, TimeUnit.MINUTES, new ArrayBlockingQueue<Runnable>(256));
1918

2019
private Promisor promisor = new DefaultPromisor(executorService);
2120

2221
@Test
23-
public void testAsync(){
24-
for (int i = 0; i < 100; i++) {
22+
public void testAsync() {
23+
for (int i = 0; i < 2048; i++) {
2524
promisor.asyncExecute(new Task(i)).whenComplete((s, t) -> {
2625
if (t != null) {
2726
System.out.println("Completed fail: " + t.getMessage());

0 commit comments

Comments
 (0)