Skip to content

Commit 32b6e3b

Browse files
committed
promise/future
1 parent 8a37160 commit 32b6e3b

File tree

9 files changed

+535
-0
lines changed

9 files changed

+535
-0
lines changed
Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
package code.concurrency.promise;
2+
3+
import java.util.concurrent.CountDownLatch;
4+
import java.util.concurrent.ExecutionException;
5+
import java.util.concurrent.TimeUnit;
6+
import java.util.concurrent.TimeoutException;
7+
import java.util.concurrent.atomic.AtomicReference;
8+
import java.util.concurrent.locks.Lock;
9+
import java.util.concurrent.locks.ReentrantLock;
10+
11+
/**
12+
* 〈AbstractFuture〉<p>
13+
*
14+
* @author zixiao
15+
* @date 2019/6/11
16+
*/
17+
public abstract class AbstractFuture<R> implements Future<R> {
18+
19+
protected volatile R result;
20+
21+
protected Throwable cause;
22+
23+
protected boolean done = false;
24+
25+
protected boolean cancelled = false;
26+
27+
protected CountDownLatch latch = new CountDownLatch(1);
28+
29+
protected Lock doneLock = new ReentrantLock();
30+
31+
protected AtomicReference<Callback> callbackReference = new AtomicReference<>();
32+
33+
@Override
34+
public boolean isDone(){
35+
return done;
36+
}
37+
38+
@Override
39+
public boolean isSuccess(){
40+
return isDone() && result != null;
41+
}
42+
43+
@Override
44+
public boolean isCancelled(){
45+
return isDone() && cancelled;
46+
}
47+
48+
@Override
49+
public boolean cancel(){
50+
throw new UnsupportedOperationException();
51+
}
52+
53+
@Override
54+
public R get() throws InterruptedException, ExecutionException {
55+
if(!isDone()){
56+
latch.await();
57+
}
58+
59+
if(isSuccess()){
60+
return result;
61+
}
62+
63+
if(isCancelled()){
64+
throw new ExecutionException("Task has been cancelled", null);
65+
}
66+
67+
throw new ExecutionException(cause);
68+
}
69+
70+
@Override
71+
public R get(long timeout, TimeUnit unit)
72+
throws InterruptedException, ExecutionException, TimeoutException{
73+
if(!isDone()){
74+
if(latch.await(timeout, unit) && !isDone()){
75+
throw new TimeoutException("Get result timeout");
76+
}
77+
}
78+
79+
if(isSuccess()){
80+
return result;
81+
}
82+
83+
if(isCancelled()){
84+
throw new ExecutionException("Task has been cancelled", null);
85+
}
86+
87+
throw new ExecutionException(cause);
88+
}
89+
90+
protected void executeCallbackOnce(){
91+
Callback callback = callbackReference.getAndSet(null);
92+
if(callback != null){
93+
callback.call();
94+
}
95+
}
96+
97+
}
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
package code.concurrency.promise;
2+
3+
/**
4+
* 〈Callback〉<p>
5+
* 〈功能详细描述〉
6+
*
7+
* @author zixiao
8+
* @date 2019/6/12
9+
*/
10+
public interface Callback {
11+
12+
void call();
13+
}
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
package code.concurrency.promise;
2+
3+
import java.util.function.BiConsumer;
4+
5+
/**
6+
* 〈DefaultFuture〉<p>
7+
* 〈功能详细描述〉
8+
*
9+
* @author zixiao
10+
* @date 2019/6/12
11+
*/
12+
public class DefaultFuture<R> extends AbstractFuture<R>{
13+
14+
@Override
15+
public Future<R> whenComplete(BiConsumer<? super R, ? super Throwable> callback){
16+
callbackReference.compareAndSet(null, new DefaultCallback(callback, this));
17+
if(isDone()){
18+
executeCallbackOnce();
19+
}
20+
return this;
21+
}
22+
23+
private class DefaultCallback implements Callback {
24+
25+
private BiConsumer<? super R, ? super Throwable> callback;
26+
27+
private AbstractFuture<R> future;
28+
29+
public DefaultCallback(BiConsumer<? super R, ? super Throwable> callback, AbstractFuture<R> future) {
30+
this.callback = callback;
31+
this.future = future;
32+
}
33+
34+
public void call(){
35+
callback.accept(future.result, future.cause);
36+
}
37+
}
38+
39+
}
Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
package code.concurrency.promise;
2+
3+
/**
4+
* 〈DefaultPromise〉<p>
5+
* 〈功能详细描述〉
6+
*
7+
* @author zixiao
8+
* @date 2019/6/11
9+
*/
10+
public class DefaultPromise<R> extends DefaultFuture<R> implements Promise<R> {
11+
12+
@Override
13+
public Future<R> getFuture() {
14+
return this;
15+
}
16+
17+
@Override
18+
public Promise<R> setSuccess(R result) throws IllegalStateException {
19+
if(trySuccess(result)){
20+
return this;
21+
}
22+
throw new IllegalStateException("Set success exception.");
23+
}
24+
25+
@Override
26+
public boolean trySuccess(R result) {
27+
if(result == null){
28+
throw new NullPointerException("Result can not be null");
29+
}
30+
if(isDone()){
31+
return false;
32+
}
33+
34+
doneLock.lock();
35+
try {
36+
if(!isDone()){
37+
this.result = result;
38+
setDone();
39+
return true;
40+
}
41+
}finally {
42+
doneLock.unlock();
43+
}
44+
return false;
45+
}
46+
47+
/**
48+
* 设置为完成
49+
*/
50+
private void setDone(){
51+
this.done = true;
52+
latch.countDown();
53+
54+
executeCallbackOnce();
55+
}
56+
57+
@Override
58+
public Promise<R> setFailure(Throwable cause) {
59+
if(tryFailure(cause)){
60+
return this;
61+
}
62+
throw new IllegalStateException("Set failure exception.");
63+
}
64+
65+
@Override
66+
public boolean tryFailure(Throwable cause) {
67+
if(cause == null){
68+
throw new NullPointerException("Cause can not be null");
69+
}
70+
if(isDone()){
71+
return false;
72+
}
73+
74+
doneLock.lock();
75+
try {
76+
if(!isDone()){
77+
this.cause = cause;
78+
setDone();
79+
return true;
80+
}
81+
}finally {
82+
doneLock.unlock();
83+
}
84+
return false;
85+
}
86+
87+
}
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
package code.concurrency.promise;
2+
3+
import java.util.concurrent.*;
4+
5+
/**
6+
* 〈DefaultPromisor〉<p>
7+
* 〈功能详细描述〉
8+
*
9+
* @author zixiao
10+
* @date 2019/6/3
11+
*/
12+
public class DefaultPromisor implements Promisor {
13+
14+
private ExecutorService executorService;
15+
16+
public DefaultPromisor(ExecutorService executorService) {
17+
this.executorService = executorService;
18+
}
19+
20+
/**
21+
* 启动异步任务的执行,并返回用于获取异步任务执行结果的凭据对象
22+
*/
23+
@Override
24+
public <R> Future<R> asyncExecute(Callable<R> callable){
25+
if(callable == null){
26+
throw new NullPointerException("Callable can not be null");
27+
}
28+
Promise<R> promise = new DefaultPromise<>();
29+
try {
30+
executorService.execute(new PromiseTask<>(promise, callable));
31+
} catch (RejectedExecutionException e) {
32+
promise.setFailure(e);
33+
}
34+
return promise.getFuture();
35+
}
36+
37+
@Override
38+
public <R> R execute(Callable<R> callable) throws ExecutionException, InterruptedException {
39+
Future<R> future = asyncExecute(callable);
40+
return future.get();
41+
}
42+
43+
private static class PromiseTask<R> implements Runnable{
44+
45+
private Promise<R> promise;
46+
47+
private Callable<R> callable;
48+
49+
public PromiseTask(Promise<R> promise, Callable<R> callable) {
50+
this.promise = promise;
51+
this.callable = callable;
52+
}
53+
54+
@Override
55+
public void run(){
56+
try {
57+
R r = callable.call();
58+
promise.trySuccess(r);
59+
} catch (Exception e) {
60+
promise.tryFailure(e);
61+
}
62+
}
63+
}
64+
65+
@Override
66+
public void shutdown(long time, TimeUnit unit){
67+
executorService.shutdown();
68+
try {
69+
executorService.awaitTermination(10, TimeUnit.SECONDS);
70+
} catch (InterruptedException e) {
71+
e.printStackTrace();
72+
}
73+
}
74+
75+
}
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
package code.concurrency.promise;
2+
3+
import java.util.concurrent.ExecutionException;
4+
import java.util.concurrent.TimeUnit;
5+
import java.util.concurrent.TimeoutException;
6+
import java.util.function.BiConsumer;
7+
8+
/**
9+
* 〈Future〉<p>
10+
* +---------------------------+
11+
* | Completed successfully |
12+
* +---------------------------+
13+
* +----> isDone() = true |
14+
* +--------------------------+ | | result = non-null |
15+
* | Uncompleted | | +===========================+
16+
* +--------------------------+ | | Completed with failure |
17+
* | isDone() = false | | +---------------------------+
18+
* | result = null |----+----> isDone() = true |
19+
* | isCancelled() = false | | | cause() = non-null |
20+
* | cause() = null | | +===========================+
21+
* +--------------------------+ | | Completed by cancellation |
22+
* | +---------------------------+
23+
* +----> isDone() = true |
24+
* | isCancelled() = true |
25+
* +---------------------------+
26+
*
27+
* @author zixiao
28+
* @date 2019/6/11
29+
*/
30+
public interface Future <R> {
31+
32+
boolean isDone();
33+
34+
boolean isSuccess();
35+
36+
boolean isCancelled();
37+
38+
boolean cancel();
39+
40+
/**
41+
* 阻塞直到取得异步操作结果
42+
* @return
43+
* @throws InterruptedException
44+
* @throws ExecutionException
45+
*/
46+
R get() throws InterruptedException, ExecutionException;
47+
48+
/**
49+
* 阻塞直到超时或取得异步操作结果
50+
* @param timeout
51+
* @param unit
52+
* @return
53+
* @throws InterruptedException
54+
* @throws ExecutionException
55+
* @throws TimeoutException
56+
*/
57+
R get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
58+
59+
Future<R> whenComplete(BiConsumer<? super R, ? super Throwable> callback);
60+
61+
}

0 commit comments

Comments
 (0)