Skip to content

Commit 3e0f2c3

Browse files
author
方卓
committed
Release v3.2.1
1 parent f2b3bbe commit 3e0f2c3

File tree

8 files changed

+348
-123
lines changed

8 files changed

+348
-123
lines changed

SensorsAnalyticsSDK/pom.xml

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
<groupId>com.sensorsdata.analytics.javasdk</groupId>
1616
<name>SensorsAnalyticsSDK</name>
1717
<artifactId>SensorsAnalyticsSDK</artifactId>
18-
<version>3.2.0</version>
18+
<version>3.2.1</version>
1919
<description>The official Java SDK of Sensors Analytics</description>
2020
<url>http://sensorsdata.cn</url>
2121

@@ -27,6 +27,11 @@
2727
</licenses>
2828

2929
<developers>
30+
<developer>
31+
<name>fangzhuo</name>
32+
<email>fangzhuo@sensorsdata.cn</email>
33+
<url>http://sensorsdata.cn</url>
34+
</developer>
3035
<developer>
3136
<name>dengshiwei</name>
3237
<email>dengshiwei@sensorsdata.cn</email>
@@ -43,18 +48,28 @@
4348
<properties>
4449
<project.build.sourceEncoding>utf-8</project.build.sourceEncoding>
4550
<project.reporting.outputEncoding>utf-8</project.reporting.outputEncoding>
51+
<jdk.version>1.6</jdk.version>
52+
<httpclient.version>4.5.2</httpclient.version>
53+
<jackson-databind.version>2.5.5</jackson-databind.version>
54+
<lombok.version>1.18.20</lombok.version>
4655
</properties>
4756

4857
<dependencies>
4958
<dependency>
5059
<groupId>org.apache.httpcomponents</groupId>
5160
<artifactId>httpclient</artifactId>
52-
<version>4.3.6</version>
61+
<version>${httpclient.version}</version>
5362
</dependency>
5463
<dependency>
5564
<groupId>com.fasterxml.jackson.core</groupId>
5665
<artifactId>jackson-databind</artifactId>
57-
<version>2.8.11.3</version>
66+
<version>${jackson-databind.version}</version>
67+
</dependency>
68+
<dependency>
69+
<groupId>org.projectlombok</groupId>
70+
<artifactId>lombok</artifactId>
71+
<version>${lombok.version}</version>
72+
<scope>provided</scope>
5873
</dependency>
5974
</dependencies>
6075

@@ -65,8 +80,8 @@
6580
<artifactId>maven-compiler-plugin</artifactId>
6681
<version>3.3</version>
6782
<configuration>
68-
<source>1.6</source>
69-
<target>1.6</target>
83+
<source>${jdk.version}</source>
84+
<target>${jdk.version}</target>
7085
<encoding>UTF-8</encoding>
7186
</configuration>
7287
</plugin>

SensorsAnalyticsSDK/src/main/java/com/sensorsdata/analytics/javasdk/SensorsConst.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ private SensorsConst() {
1515
/**
1616
* 当前JDK版本号,注意要和pom文件里面的version保持一致
1717
*/
18-
public static final String SDK_VERSION = "3.2.0";
18+
public static final String SDK_VERSION = "3.2.1";
1919
/**
2020
* 当前语言类型
2121
*/
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
package com.sensorsdata.analytics.javasdk.bean;
2+
3+
4+
5+
import lombok.AllArgsConstructor;
6+
import lombok.Getter;
7+
import lombok.Setter;
8+
import lombok.ToString;
9+
10+
import java.util.List;
11+
import java.util.Map;
12+
13+
/**
14+
* 发送异常数据包装类
15+
*
16+
* @author fangzhuo
17+
* @version 1.0.0
18+
* @since 2021/11/06 10:35
19+
*/
20+
@Setter
21+
@Getter
22+
@AllArgsConstructor
23+
@ToString
24+
public class FailedData {
25+
/**
26+
* 失败原因
27+
*/
28+
private String failedMessage;
29+
/**
30+
* 失败数据
31+
*/
32+
private List<Map<String,Object>> failedData;
33+
}

SensorsAnalyticsSDK/src/main/java/com/sensorsdata/analytics/javasdk/consumer/BatchConsumer.java

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
11
package com.sensorsdata.analytics.javasdk.consumer;
22

3+
import com.sensorsdata.analytics.javasdk.util.SensorsAnalyticsUtil;
4+
35
import com.fasterxml.jackson.core.JsonProcessingException;
46
import com.fasterxml.jackson.databind.ObjectMapper;
5-
import com.sensorsdata.analytics.javasdk.util.SensorsAnalyticsUtil;
67

78
import java.util.LinkedList;
89
import java.util.List;
@@ -25,16 +26,22 @@ public BatchConsumer(final String serverUrl) {
2526
}
2627

2728
public BatchConsumer(final String serverUrl, final int bulkSize) {
28-
this(serverUrl, bulkSize, false);
29+
this(serverUrl, 3, bulkSize);
30+
}
31+
32+
public BatchConsumer(final String serverUrl, final int timeoutSec, final int bulkSize) {
33+
this(serverUrl, bulkSize, timeoutSec, false);
2934
}
3035

31-
public BatchConsumer(final String serverUrl, final int bulkSize, final boolean throwException) {
32-
this(serverUrl, bulkSize, 0, throwException);
36+
public BatchConsumer(final String serverUrl, final int bulkSize, final int timeoutSec,
37+
final boolean throwException) {
38+
this(serverUrl, bulkSize, timeoutSec, 0, throwException);
3339
}
3440

35-
public BatchConsumer(final String serverUrl, final int bulkSize, final int maxCacheSize, final boolean throwException) {
41+
public BatchConsumer(final String serverUrl, final int bulkSize, final int timeoutSec, final int maxCacheSize,
42+
final boolean throwException) {
3643
this.messageList = new LinkedList<Map<String, Object>>();
37-
this.httpConsumer = new HttpConsumer(serverUrl, null);
44+
this.httpConsumer = new HttpConsumer(serverUrl, Math.max(timeoutSec,1));
3845
this.jsonMapper = SensorsAnalyticsUtil.getJsonObjectMapper();
3946
this.bulkSize = Math.min(MAX_FLUSH_BULK_SIZE, bulkSize);
4047
if (maxCacheSize > MAX_CACHE_SIZE) {
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
package com.sensorsdata.analytics.javasdk.consumer;
2+
3+
import com.sensorsdata.analytics.javasdk.bean.FailedData;
4+
5+
/**
6+
* 异常数据回调
7+
*
8+
* @author fangzhuo
9+
* @version 1.0.0
10+
* @since 2021/11/05 23:47
11+
*/
12+
public interface Callback {
13+
/**
14+
* 返回发送失败的数据
15+
*
16+
* @param failedData 失败数据
17+
*/
18+
void onFailed(FailedData failedData);
19+
20+
}
Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
package com.sensorsdata.analytics.javasdk.consumer;
2+
3+
import com.sensorsdata.analytics.javasdk.bean.FailedData;
4+
import com.sensorsdata.analytics.javasdk.util.SensorsAnalyticsUtil;
5+
6+
import com.fasterxml.jackson.core.JsonProcessingException;
7+
import com.fasterxml.jackson.databind.ObjectMapper;
8+
import lombok.NonNull;
9+
10+
import java.util.ArrayList;
11+
import java.util.List;
12+
import java.util.Map;
13+
import java.util.concurrent.LinkedBlockingQueue;
14+
import java.util.concurrent.ScheduledExecutorService;
15+
import java.util.concurrent.ScheduledThreadPoolExecutor;
16+
import java.util.concurrent.TimeUnit;
17+
18+
/**
19+
* 网络批量请求发送,异常快速返回模式
20+
*
21+
* @author fangzhuo
22+
* @version 1.0.0
23+
* @since 2021/11/05 23:48
24+
*/
25+
public class FastBatchConsumer implements Consumer {
26+
27+
private static final int MAX_CACHE_SIZE = 10000;
28+
private static final int MIN_CACHE_SIZE = 1000;
29+
private static final int MIN_BULK_SIZE = 1;
30+
31+
32+
private final LinkedBlockingQueue<Map<String, Object>> buffer;
33+
private final HttpConsumer httpConsumer;
34+
private final ObjectMapper jsonMapper;
35+
private final Callback callback;
36+
private final int bulkSize;
37+
private final ScheduledExecutorService executorService;
38+
39+
public FastBatchConsumer(@NonNull String serverUrl, @NonNull Callback callback) {
40+
this(serverUrl, false, callback);
41+
}
42+
43+
public FastBatchConsumer(@NonNull String serverUrl, int flushSec, final boolean timing, @NonNull Callback callback) {
44+
this(serverUrl, timing, 50, 6000, flushSec, 3, callback);
45+
}
46+
47+
public FastBatchConsumer(@NonNull String serverUrl, final boolean timing, @NonNull Callback callback) {
48+
this(serverUrl, timing, 50, callback);
49+
}
50+
51+
public FastBatchConsumer(@NonNull String serverUrl, final boolean timing, int bulkSize, @NonNull Callback callback) {
52+
this(serverUrl, timing, bulkSize, 6000, callback);
53+
}
54+
55+
public FastBatchConsumer(@NonNull String serverUrl, final boolean timing, int bulkSize, int maxCacheSize,
56+
@NonNull Callback callback) {
57+
this(serverUrl, timing, bulkSize, maxCacheSize, 1, 3, callback);
58+
}
59+
60+
public FastBatchConsumer(@NonNull String serverUrl, final boolean timing, final int bulkSize, int maxCacheSize,
61+
int flushSec, int timeoutSec, @NonNull Callback callback) {
62+
this.buffer =
63+
new LinkedBlockingQueue<Map<String, Object>>(Math.min(Math.max(MIN_CACHE_SIZE, maxCacheSize), MAX_CACHE_SIZE));
64+
this.httpConsumer = new HttpConsumer(serverUrl, Math.max(timeoutSec, 1));
65+
this.jsonMapper = SensorsAnalyticsUtil.getJsonObjectMapper();
66+
this.callback = callback;
67+
this.bulkSize = Math.min(MIN_CACHE_SIZE, Math.max(bulkSize, MIN_BULK_SIZE));
68+
executorService = new ScheduledThreadPoolExecutor(1);
69+
executorService.scheduleWithFixedDelay(new Runnable() {
70+
@Override
71+
public void run() {
72+
if (timing) {
73+
flush();
74+
} else {
75+
if (buffer.size() >= bulkSize) {
76+
flush();
77+
}
78+
}
79+
}
80+
}, 1, Math.max(flushSec, 1), TimeUnit.SECONDS);
81+
}
82+
83+
@Override
84+
public void send(Map<String, Object> message) {
85+
if (!buffer.offer(message)) {
86+
List<Map<String, Object>> res = new ArrayList<Map<String, Object>>(1);
87+
res.add(message);
88+
callback.onFailed(new FailedData("can't offer to buffer.", res));
89+
}
90+
}
91+
92+
/**
93+
* This method don't need to be called actively.Because instance will create scheduled thread to do.
94+
*/
95+
@Override
96+
public void flush() {
97+
List<Map<String, Object>> results = new ArrayList<Map<String, Object>>();
98+
buffer.drainTo(results);
99+
if (results.isEmpty()) {
100+
return;
101+
}
102+
while (!results.isEmpty()) {
103+
String sendingData;
104+
List<Map<String, Object>> sendList = results.subList(0, Math.min(bulkSize, results.size()));
105+
try {
106+
sendingData = jsonMapper.writeValueAsString(sendList);
107+
} catch (JsonProcessingException e) {
108+
callback.onFailed(new FailedData("can't process json.", sendList));
109+
sendList.clear();
110+
continue;
111+
}
112+
try {
113+
this.httpConsumer.consume(sendingData);
114+
} catch (Exception e) {
115+
callback.onFailed(new FailedData("failed to send data.", sendList));
116+
}
117+
sendList.clear();
118+
}
119+
}
120+
121+
@Override
122+
public void close() {
123+
this.httpConsumer.close();
124+
this.executorService.shutdown();
125+
}
126+
}

SensorsAnalyticsSDK/src/main/java/com/sensorsdata/analytics/javasdk/consumer/HttpConsumer.java

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,9 @@
22

33
import com.sensorsdata.analytics.javasdk.SensorsConst;
44
import com.sensorsdata.analytics.javasdk.util.Base64Coder;
5+
56
import org.apache.http.NameValuePair;
7+
import org.apache.http.client.config.RequestConfig;
68
import org.apache.http.client.entity.UrlEncodedFormEntity;
79
import org.apache.http.client.methods.CloseableHttpResponse;
810
import org.apache.http.client.methods.HttpPost;
@@ -26,18 +28,37 @@ class HttpConsumer implements Closeable {
2628
final String serverUrl;
2729
final Map<String, String> httpHeaders;
2830
final boolean compressData;
31+
final RequestConfig requestConfig;
32+
33+
public HttpConsumer(String serverUrl, int timeoutSec) {
34+
this(serverUrl, null, timeoutSec);
35+
}
36+
37+
public HttpConsumer(String serverUrl, Map<String, String> httpHeaders) {
38+
this(serverUrl, httpHeaders, 3);
39+
}
2940

30-
HttpConsumer(String serverUrl, Map<String, String> httpHeaders) {
41+
HttpConsumer(String serverUrl, Map<String, String> httpHeaders, int timeoutSec) {
3142
this.serverUrl = serverUrl.trim();
3243
this.httpHeaders = httpHeaders;
3344
this.compressData = true;
45+
int timeout = timeoutSec * 1000;
46+
this.requestConfig = RequestConfig.custom().setConnectionRequestTimeout(timeout)
47+
.setConnectTimeout(timeout).setSocketTimeout(timeout).build();
48+
this.httpClient = HttpClients.custom()
49+
.setUserAgent(String.format("SensorsAnalytics Java SDK %s", SensorsConst.SDK_VERSION))
50+
.setDefaultRequestConfig(requestConfig)
51+
.build();
3452
}
3553

36-
synchronized void consume(final String data) throws IOException, HttpConsumerException {
54+
synchronized void consume(final String data) throws IOException, HttpConsumerException {
3755
HttpUriRequest request = getHttpRequest(data);
3856
CloseableHttpResponse response = null;
3957
if (httpClient == null) {
40-
httpClient = HttpClients.custom().setUserAgent("SensorsAnalytics Java SDK " + SensorsConst.SDK_VERSION).build();
58+
httpClient = HttpClients.custom()
59+
.setUserAgent(String.format("SensorsAnalytics Java SDK %s", SensorsConst.SDK_VERSION))
60+
.setDefaultRequestConfig(requestConfig)
61+
.build();
4162
}
4263
try {
4364
response = httpClient.execute(request);

0 commit comments

Comments
 (0)