Skip to content

Commit 2ecde20

Browse files
committed
async http client
1 parent feba333 commit 2ecde20

File tree

6 files changed

+387
-77
lines changed

6 files changed

+387
-77
lines changed

pom.xml

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,18 @@
130130
<version>4.5.5</version>
131131
</dependency>
132132

133+
<dependency>
134+
<groupId>org.apache.httpcomponents</groupId>
135+
<artifactId>httpcore-nio</artifactId>
136+
<version>4.4.9</version>
137+
</dependency>
138+
139+
<dependency>
140+
<groupId>org.apache.httpcomponents</groupId>
141+
<artifactId>httpasyncclient</artifactId>
142+
<version>4.1.2</version>
143+
</dependency>
144+
133145
<dependency>
134146
<groupId>commons-io</groupId>
135147
<artifactId>commons-io</artifactId>

src/main/java/code/concurrency/util/ConcurrentTest.java

Lines changed: 18 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,10 @@
22

33
import org.junit.Test;
44

5-
import java.util.ArrayList;
6-
import java.util.List;
75
import java.util.Random;
86
import java.util.concurrent.*;
97
import java.util.concurrent.atomic.AtomicInteger;
8+
import java.util.function.Supplier;
109

1110
/**
1211
*〈并发模拟测试〉<p>
@@ -31,28 +30,22 @@ public class ConcurrentTest {
3130
* 原子变量,线程并发处理
3231
*/
3332
@Test
34-
public void testAtomic(){
35-
try {
36-
AtomicInteger atomicCount = new AtomicInteger(TASK_NUM);
37-
ExecutorService executor = Executors.newFixedThreadPool(100);
38-
List<Future> futureList = new ArrayList<Future>(TASK_NUM);
39-
Future future = null;
40-
for(int i=0;i<TASK_NUM;i++){
41-
future = executor.submit(new AtomicTask(atomicCount));
42-
futureList.add(future);
43-
}
44-
for(int i=0;i<TASK_NUM;i++){
45-
future = futureList.get(i);
46-
System.out.println("Task"+ i +", result:"+ future.get(20, TimeUnit.MILLISECONDS));
47-
}
48-
executor.shutdown();
49-
} catch (InterruptedException e) {
50-
e.printStackTrace();
51-
} catch (ExecutionException e) {
52-
e.printStackTrace();
53-
} catch (TimeoutException e) {
54-
e.printStackTrace();
33+
public void testAtomic() throws InterruptedException {
34+
AtomicInteger atomicCount = new AtomicInteger(TASK_NUM);
35+
ExecutorService executor = Executors.newFixedThreadPool(100);
36+
37+
for(int i=0; i<TASK_NUM; i++){
38+
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(new AtomicTask(atomicCount), executor);
39+
future.whenComplete((r, t) ->{
40+
if(t != null){
41+
t.printStackTrace();
42+
}else{
43+
System.out.println("Task result:"+ r);
44+
}
45+
});
5546
}
47+
executor.shutdown();
48+
executor.awaitTermination(10, TimeUnit.SECONDS);
5649
}
5750

5851
/**
@@ -125,7 +118,7 @@ public void testCyclicBarrier(){
125118
/**
126119
* 原子变量,线程安全
127120
*/
128-
class AtomicTask implements Callable<Integer>{
121+
class AtomicTask implements Supplier<Integer>{
129122

130123
private AtomicInteger atomicCount;
131124

@@ -134,7 +127,7 @@ public AtomicTask(AtomicInteger atomicCount){
134127
}
135128

136129
@Override
137-
public Integer call() throws Exception {
130+
public Integer get() {
138131
Random random = new Random();
139132
try {
140133
Thread.sleep(random.nextInt(5));
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
package code.http;
2+
3+
import lombok.Getter;
4+
import org.apache.http.HttpEntity;
5+
import org.apache.http.HttpResponse;
6+
import org.apache.http.client.methods.HttpPost;
7+
import org.apache.http.concurrent.FutureCallback;
8+
import org.apache.http.entity.ContentType;
9+
import org.apache.http.entity.StringEntity;
10+
import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
11+
12+
import java.io.Closeable;
13+
import java.io.IOException;
14+
import java.util.concurrent.Future;
15+
16+
import static code.http.HttpClientBuilder.IdleConnectionEvictor;
17+
18+
/**
19+
* 〈异步HttpClient〉<p>
20+
* 〈功能详细描述〉
21+
*
22+
* @author zixiao
23+
* @date 2019/6/17
24+
*/
25+
public class AsyncHttpClient implements Closeable {
26+
27+
@Getter
28+
private final CloseableHttpAsyncClient client;
29+
30+
private IdleConnectionEvictor connEvictor;
31+
32+
public AsyncHttpClient(CloseableHttpAsyncClient client) {
33+
this.client = client;
34+
}
35+
36+
public AsyncHttpClient(CloseableHttpAsyncClient client, IdleConnectionEvictor connEvictor) {
37+
this.client = client;
38+
this.connEvictor = connEvictor;
39+
}
40+
41+
/**
42+
* Post方式
43+
*
44+
* @param url 请求地址
45+
* @param config 请求配置
46+
* @param json json格式字符串
47+
* @param callback 回调方法
48+
* @return
49+
* @throws IOException
50+
*/
51+
public Future<HttpResponse> post(String url, HttpRequestConfig config, String json, FutureCallback<HttpResponse> callback) throws IOException{
52+
HttpEntity entity = new StringEntity(json, ContentType.create("application/json", config.getCharset()));
53+
54+
HttpPost httpPost = new HttpPost(url);
55+
httpPost.setEntity(entity);
56+
57+
return client.execute(httpPost, callback);
58+
}
59+
60+
/**
61+
* Post方式
62+
*
63+
* @param url 请求地址
64+
* @param config 请求配置
65+
* @param json json格式字符串
66+
* @return
67+
* @throws IOException
68+
*/
69+
public Future<HttpResponse> post(String url, HttpRequestConfig config, String json) throws IOException{
70+
HttpEntity entity = new StringEntity(json, ContentType.create("application/json", config.getCharset()));
71+
72+
HttpPost httpPost = new HttpPost(url);
73+
httpPost.setEntity(entity);
74+
75+
return post(url, config, json, null);
76+
}
77+
78+
@Override
79+
public void close() throws IOException {
80+
if(connEvictor != null){
81+
connEvictor.shutdown();
82+
}
83+
if(client != null){
84+
client.close();
85+
}
86+
}
87+
88+
}
Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
package code.http;
2+
3+
import org.apache.http.HttpResponse;
4+
import org.apache.http.concurrent.FutureCallback;
5+
import org.junit.Test;
6+
7+
import javax.net.ssl.SSLPeerUnverifiedException;
8+
import java.io.IOException;
9+
import java.util.concurrent.ExecutionException;
10+
import java.util.concurrent.Future;
11+
12+
/**
13+
* 〈一句话功能简述〉<p>
14+
* 〈功能详细描述〉
15+
*
16+
* @author zixiao
17+
* @date 2019/5/16
18+
*/
19+
public class AsyncHttpClientTest {
20+
21+
private AsyncHttpClient asyncHttpClient;
22+
23+
@Test
24+
public void sslTest() throws IOException, ExecutionException, InterruptedException {
25+
asyncHttpClient = HttpClientBuilder.custom().ssl().buildAsync();
26+
String url = "https://www.baidu.com";
27+
asyncHttpClient.post(url, HttpRequestConfig.getDefault(), "hello", new FutureCallback<HttpResponse>() {
28+
@Override
29+
public void completed(HttpResponse result) {
30+
System.out.printf("Result: "+result);
31+
}
32+
33+
@Override
34+
public void failed(Exception ex) {
35+
ex.printStackTrace();
36+
}
37+
38+
@Override
39+
public void cancelled() {
40+
41+
}
42+
});
43+
Thread.sleep(3000);
44+
asyncHttpClient.close();
45+
}
46+
47+
@Test
48+
public void noSslTest() throws IOException, ExecutionException, InterruptedException {
49+
asyncHttpClient = HttpClientBuilder.custom().buildAsync();
50+
String url = "https://www.baidu.com";
51+
try {
52+
Future<HttpResponse> future = post(url, "hello");
53+
System.out.println(future.get());
54+
} catch (Exception e) {
55+
if(e.getMessage().contains("UnsupportedSchemeException")){
56+
System.out.println(e.getMessage());
57+
}else {
58+
throw e;
59+
}
60+
}
61+
}
62+
63+
@Test
64+
public void verifyHostTest() throws IOException, ExecutionException, InterruptedException {
65+
asyncHttpClient = HttpClientBuilder.custom().ssl().verifyHostname(true).buildAsync();
66+
String url = "https://127.0.0.1:8443/hello";
67+
try {
68+
Future<HttpResponse> future = post(url, "hello");
69+
System.out.println(future.get());
70+
} catch (Exception e) {
71+
if(e instanceof SSLPeerUnverifiedException){
72+
System.out.println(e.getMessage());
73+
}else {
74+
throw e;
75+
}
76+
}
77+
}
78+
79+
@Test
80+
public void disableVerifyHostTest() throws IOException, ExecutionException, InterruptedException {
81+
asyncHttpClient = HttpClientBuilder.custom().ssl().verifyHostname(false).buildAsync();
82+
String url = "https://127.0.0.1:8443/hello";
83+
Future<HttpResponse> future = post(url, "hello");
84+
System.out.println(future.get());
85+
}
86+
87+
@Test
88+
public void trustStore() throws IOException, ExecutionException, InterruptedException {
89+
String path = System.getenv("JAVA_HOME")+"/jre/lib/security/cacerts";
90+
asyncHttpClient = HttpClientBuilder.custom().ssl(path, "changeit").buildAsync();
91+
String url = "https://www.baidu.com";
92+
Future<HttpResponse> future = post(url, "hello");
93+
System.out.println(future.get());
94+
}
95+
96+
@Test
97+
public void keyStore() throws IOException, ExecutionException, InterruptedException {
98+
String path = "/Users/zixiao/keys/tomcat.jks";
99+
asyncHttpClient = HttpClientBuilder.custom()
100+
.ssl2(path, "123456", "123456")
101+
.verifyHostname(false)
102+
.buildAsync();
103+
String url = "https://127.0.0.1:8443";
104+
Future<HttpResponse> future = post(url, "hello");
105+
System.out.println(future.get());
106+
}
107+
108+
/**
109+
* POST方式
110+
*
111+
* @param url URL
112+
* @param content 请求体
113+
* @return 响应结果
114+
* @throws IOException
115+
*/
116+
public Future<HttpResponse> post(String url, String content) throws IOException {
117+
return asyncHttpClient.post(url, HttpRequestConfig.getDefault(), content);
118+
}
119+
120+
121+
}

0 commit comments

Comments
 (0)