Skip to content

Commit 60878aa

Browse files
committed
Bugfix: TunnelWorker heartbeat timeout cannot auto retry
1 parent f6c75eb commit 60878aa

File tree

6 files changed

+56
-148
lines changed

6 files changed

+56
-148
lines changed

pom.xml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
<modelVersion>4.0.0</modelVersion>
66
<groupId>com.aliyun.openservices</groupId>
77
<artifactId>tablestore</artifactId>
8-
<version>4.10.1-SNAPSHOT</version>
8+
<version>4.10.2-SNAPSHOT</version>
99
<packaging>jar</packaging>
1010
<name>AliCloud TableStore SDK for Java</name>
1111
<url>http://www.aliyun.com</url>
@@ -183,6 +183,7 @@
183183
<exclude>com.lmax:disruptor:jar:</exclude>
184184
<exclude>commons-codec:commons-codec:jar:</exclude>
185185
<exclude>org.hamcrest:hamcrest-core:jar:</exclude>
186+
<exclude>com.google.code.gson:gson:jar:</exclude>
186187
</excludes>
187188
</artifactSet>
188189
<relocations>

src/main/java/com/alicloud/openservices/tablestore/tunnel/worker/Checkpointer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ public void checkpoint(String token) {
5757
}
5858
}
5959
} catch (Exception e) {
60-
LOG.warn("Checkpoint occurs error, detail: {}", e);
60+
LOG.warn("Checkpoint occurs error, detail: {}", e.toString());
6161
}
6262
}
6363

src/main/java/com/alicloud/openservices/tablestore/tunnel/worker/DefaultChannelProcessor.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,14 +34,14 @@ public void process(ProcessRecordsInput input) {
3434
try {
3535
checkpointer.checkpoint(FINISH_TAG);
3636
} catch (Exception e) {
37-
LOG.error("checkpoint error, detail: {}", e);
37+
LOG.error("checkpoint error, detail: {}", e.toString());
3838
}
3939
} else if (System.currentTimeMillis() - latestCheckpoint > checkpointIntervalInMillis) {
4040
LOG.info("begin do checkpoint, token = {}", input.getNextToken());
4141
try {
4242
checkpointer.checkpoint(input.getNextToken());
4343
} catch (Exception e) {
44-
LOG.error("checkpoint error, detail: {}", e);
44+
LOG.error("checkpoint error, detail: {}", e.toString());
4545
}
4646
latestCheckpoint = System.currentTimeMillis();
4747
}

src/main/java/com/alicloud/openservices/tablestore/tunnel/worker/TunnelWorker.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,7 @@ public void run() {
149149
// 若Worker处于ENDED状态,则需要重新进行资源初始化。
150150
if (workerStatus.get().equals(TunnelWorkerStatus.WORKER_ENDED)) {
151151
workerStatus.set(TunnelWorkerStatus.WORKER_READY);
152+
lastHeartbeatTime = new Date();
152153
connect();
153154
}
154155
try {

src/test/java/com/alicloud/openservices/tablestore/tunnel/functiontest/TestTunnelWorker.java

Lines changed: 39 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,13 @@
11
package com.alicloud.openservices.tablestore.tunnel.functiontest;
22

33
import com.alicloud.openservices.tablestore.TunnelClient;
4+
import com.alicloud.openservices.tablestore.model.tunnel.CreateTunnelRequest;
5+
import com.alicloud.openservices.tablestore.model.tunnel.CreateTunnelResponse;
6+
import com.alicloud.openservices.tablestore.model.tunnel.DeleteTunnelRequest;
7+
import com.alicloud.openservices.tablestore.model.tunnel.DescribeTunnelRequest;
8+
import com.alicloud.openservices.tablestore.model.tunnel.DescribeTunnelResponse;
9+
import com.alicloud.openservices.tablestore.model.tunnel.TunnelStage;
10+
import com.alicloud.openservices.tablestore.model.tunnel.TunnelType;
411
import com.alicloud.openservices.tablestore.tunnel.worker.IChannelProcessor;
512
import com.alicloud.openservices.tablestore.tunnel.worker.ProcessRecordsInput;
613
import com.alicloud.openservices.tablestore.tunnel.worker.TunnelWorker;
@@ -11,6 +18,7 @@ public class TestTunnelWorker {
1118
private static final String AccessId = "";
1219
private static final String AccessKey = "";
1320
private static final String InstanceName = "";
21+
private static final String TableName = "";
1422

1523
static class SimpleProcessor implements IChannelProcessor {
1624
@Override
@@ -35,23 +43,46 @@ public void shutdown() {
3543
public static void main(String[] args) {
3644
TunnelClient client = new TunnelClient(Endpoint, AccessId, AccessKey, InstanceName);
3745
TunnelWorkerConfig config = new TunnelWorkerConfig(new SimpleProcessor());
38-
TunnelWorker worker = new TunnelWorker("480e3a3b-6672-46b1-b970-e611232dcd53", client, config);
46+
String tunnelName1 = "test-tunnel-1";
47+
CreateTunnelResponse createTunnelResponse =
48+
client.createTunnel(new CreateTunnelRequest(TableName, tunnelName1, TunnelType.BaseData));
49+
String tunnelId1 = createTunnelResponse.getTunnelId();
50+
TunnelWorker worker1 = new TunnelWorker(tunnelId1, client, config);
3951
try {
4052
System.out.println("worker running....");
41-
worker.connectAndWorking();
42-
Thread.sleep(50000);
43-
worker.shutdown();
44-
System.out.println("worker shutdown....");
53+
worker1.connectAndWorking();
54+
// 这里会等待一段时间,保证测试的全量可以消费完,具体实现时,可以后台启一个线程进行定期的查询和判断。
55+
Thread.sleep(120000);
56+
DescribeTunnelResponse resp = client.describeTunnel(new DescribeTunnelRequest(TableName, tunnelName1));
57+
if (TunnelStage.ProcessStream.equals(resp.getTunnelInfo().getStage())) {
58+
System.out.println("Base data consume finished, shutdown worker.");
59+
worker1.shutdown();
60+
System.out.println("Delete Tunnel " + tunnelId1);
61+
client.deleteTunnel(new DeleteTunnelRequest(TableName, tunnelName1));
62+
}
63+
System.out.println("worker1 shutdown....");
4564
} catch (Exception e) {
4665
e.printStackTrace();
47-
worker.shutdown();
66+
worker1.shutdown();
4867
}
4968

50-
// Reconstruct and connect.
51-
TunnelWorker worker2 = new TunnelWorker("480e3a3b-6672-46b1-b970-e611232dcd53", client, config);
69+
// 再创建一个新的Tunnel, 进行消费。
70+
String tunnelName2 = "test-tunnel-2";
71+
CreateTunnelResponse createTunnelResponse2 =
72+
client.createTunnel(new CreateTunnelRequest(TableName, tunnelName2, TunnelType.BaseData));
73+
String tunnelId2 = createTunnelResponse2.getTunnelId();
74+
TunnelWorker worker2 = new TunnelWorker(tunnelId2, client, config);
5275
try {
5376
System.out.println("worker2 running...");
5477
worker2.connectAndWorking();
78+
Thread.sleep(60000);
79+
80+
worker2.shutdown();
81+
client.deleteTunnel(new DeleteTunnelRequest(TableName, tunnelName2));
82+
83+
// 下面两个资源(内部都有线程池)只有在真正需要退出的时候,再shutdown。如果不小心shutdown的话,也要new一个新的出来。
84+
client.shutdown();
85+
config.shutdown();
5586
} catch (Exception e) {
5687
e.printStackTrace();
5788
worker2.shutdown();
Lines changed: 11 additions & 136 deletions
Original file line numberDiff line numberDiff line change
@@ -1,59 +1,18 @@
11
package com.alicloud.openservices.tablestore.tunnel.functiontest;
22

3-
import java.io.FileInputStream;
4-
import java.util.ArrayList;
5-
import java.util.List;
6-
import java.util.Properties;
7-
8-
import com.alicloud.openservices.tablestore.SyncClient;
93
import com.alicloud.openservices.tablestore.TunnelClient;
10-
import com.alicloud.openservices.tablestore.model.Column;
11-
import com.alicloud.openservices.tablestore.model.ColumnValue;
12-
import com.alicloud.openservices.tablestore.model.CreateTableRequest;
13-
import com.alicloud.openservices.tablestore.model.DeleteTableRequest;
14-
import com.alicloud.openservices.tablestore.model.PrimaryKeyBuilder;
15-
import com.alicloud.openservices.tablestore.model.PrimaryKeyType;
16-
import com.alicloud.openservices.tablestore.model.PrimaryKeyValue;
17-
import com.alicloud.openservices.tablestore.model.PutRowRequest;
18-
import com.alicloud.openservices.tablestore.model.RowPutChange;
19-
import com.alicloud.openservices.tablestore.model.TableMeta;
20-
import com.alicloud.openservices.tablestore.model.TableOptions;
21-
import com.alicloud.openservices.tablestore.model.tunnel.CreateTunnelRequest;
22-
import com.alicloud.openservices.tablestore.model.tunnel.CreateTunnelResponse;
23-
import com.alicloud.openservices.tablestore.model.tunnel.DeleteTunnelRequest;
24-
import com.alicloud.openservices.tablestore.model.tunnel.DeleteTunnelResponse;
25-
import com.alicloud.openservices.tablestore.model.tunnel.TunnelType;
264
import com.alicloud.openservices.tablestore.tunnel.worker.IChannelProcessor;
275
import com.alicloud.openservices.tablestore.tunnel.worker.ProcessRecordsInput;
286
import com.alicloud.openservices.tablestore.tunnel.worker.TunnelWorker;
297
import com.alicloud.openservices.tablestore.tunnel.worker.TunnelWorkerConfig;
30-
import org.junit.Assert;
31-
import org.junit.BeforeClass;
32-
import org.junit.Test;
338

349
public class TestTunnelWorkerReconnect {
35-
private static final String CONF_PATH = "tunnel.config";
36-
private static String endpoint = "";
37-
private static String accessId = "";
38-
private static String accessKey = "";
39-
private static String instanceName = "";
40-
41-
@BeforeClass
42-
public static void loadConfig() {
43-
System.out.println("here");
44-
try {
45-
Properties properties = new Properties();
46-
properties.load(new FileInputStream(CONF_PATH));
47-
endpoint = properties.getProperty("OtsEndpoint");
48-
accessId = properties.getProperty("AccessId");
49-
accessKey = properties.getProperty("AccessKey");
50-
instanceName = properties.getProperty("InstanceName");
51-
} catch (Exception e) {
52-
e.printStackTrace();
53-
}
54-
}
10+
private static final String Endpoint = "";
11+
private static final String AccessId = "";
12+
private static final String AccessKey = "";
13+
private static final String InstanceName = "";
5514

56-
class SimpleProcessor implements IChannelProcessor {
15+
static class SimpleProcessor implements IChannelProcessor {
5716
@Override
5817
public void process(ProcessRecordsInput input) {
5918
System.out.println("Default record processor, would print records count");
@@ -73,100 +32,16 @@ public void shutdown() {
7332
}
7433
}
7534

76-
static void createTunnel(TunnelClient client, String tableName, String tunnelName) {
77-
CreateTunnelRequest request = new CreateTunnelRequest(tableName, tunnelName, TunnelType.Stream);
78-
CreateTunnelResponse resp = client.createTunnel(request);
79-
System.out.println("RequestId: " + resp.getRequestId());
80-
System.out.println("TunnelId: " + resp.getTunnelId());
81-
}
82-
83-
static void createTable(SyncClient client, String tableName) {
84-
TableMeta meta = new TableMeta(tableName);
85-
meta.addPrimaryKeyColumn("int", PrimaryKeyType.INTEGER);
86-
meta.addPrimaryKeyColumn("str", PrimaryKeyType.STRING);
87-
CreateTableRequest request = new CreateTableRequest(meta, new TableOptions(-1, 1));
88-
client.createTable(request);
89-
90-
}
91-
92-
static void deleteTable(SyncClient client, String tableName) {
93-
DeleteTableRequest request = new DeleteTableRequest(tableName);
94-
client.deleteTable(request);
95-
}
96-
97-
static List<RowPutChange> putRows(SyncClient client, String tableName, int rowCount) {
98-
List<RowPutChange> changes = new ArrayList<RowPutChange>();
99-
for (int i = 0; i < rowCount; i++) {
100-
PrimaryKeyBuilder primaryKeyBuilder = PrimaryKeyBuilder.createPrimaryKeyBuilder();
101-
primaryKeyBuilder.addPrimaryKeyColumn("int", PrimaryKeyValue.fromLong(i));
102-
primaryKeyBuilder.addPrimaryKeyColumn("str", PrimaryKeyValue.fromString("string" + i));
103-
RowPutChange rowPutChange = new RowPutChange(tableName, primaryKeyBuilder.build());
104-
105-
for (int j = 0; j < 10; j++) {
106-
rowPutChange.addColumn(new Column("test" + j, ColumnValue.fromLong(i)));
107-
}
108-
client.putRow(new PutRowRequest(rowPutChange));
109-
changes.add(rowPutChange);
110-
}
111-
System.out.println(String.format("Put %d rows succeed.", rowCount));
112-
return changes;
113-
}
114-
115-
static void deleteTunnel(TunnelClient client, String tableName, String tunnelName) {
116-
DeleteTunnelRequest request = new DeleteTunnelRequest(tableName, tunnelName);
117-
DeleteTunnelResponse resp = client.deleteTunnel(request);
118-
System.out.println("RequestId: " + resp.getRequestId());
119-
}
120-
121-
@Test
122-
public void testTunnelWorkerReconnect_WithTunnelInvalid() {
123-
TunnelClient tunnelClient = new TunnelClient(endpoint, accessId, accessKey, instanceName);
124-
SyncClient syncClient = new SyncClient(endpoint, accessId, accessKey, instanceName);
125-
126-
String tableName = "test_zr" + System.currentTimeMillis();
127-
String tunnelName = "test_zr" + System.currentTimeMillis();
128-
// 1. create table
129-
System.out.println("Begin Create Table: " + tableName);
130-
createTable(syncClient, tableName);
131-
System.out.println("++++++++++++++++++++++++++++++++++++");
132-
133-
// 2. create tunnel
134-
System.out.println("Begin Create Tunnel: " + tunnelName);
135-
CreateTunnelResponse resp = tunnelClient.createTunnel(
136-
new CreateTunnelRequest(tableName, tunnelName, TunnelType.Stream));
137-
String tunnelId = resp.getTunnelId();
138-
System.out.println("Create Tunnel, Id: " + tunnelId);
139-
System.out.println("++++++++++++++++++++++++++++++++++++");
140-
141-
// 3. put data
142-
System.out.println("Begin Put Data in backend.");
143-
putRows(syncClient, tableName, 5000);
144-
System.out.println("++++++++++++++++++++++++++++++++++++");
145-
146-
// 4. new tunnel worker and consume.
35+
public static void main(String[] args) {
36+
TunnelClient client = new TunnelClient(Endpoint, AccessId, AccessKey, InstanceName);
14737
TunnelWorkerConfig config = new TunnelWorkerConfig(new SimpleProcessor());
148-
TunnelWorker worker = new TunnelWorker(tunnelId, tunnelClient, config);
38+
TunnelWorker worker1 = new TunnelWorker("cfea9e1d-19f3-4835-8551-698a5c5c0773", client, config);
14939
try {
150-
worker.connectAndWorking();
151-
Thread.sleep(50000);
152-
System.out.println("Begin Delete Tunnel: " + tunnelName);
153-
deleteTunnel(tunnelClient, tableName, tunnelName);
154-
System.out.println("++++++++++++++++++++++++++++++++++++");
155-
Thread.sleep(50000);
156-
// cannot achieve here.
157-
Assert.fail();
40+
System.out.println("worker running....");
41+
worker1.connectAndWorking();
15842
} catch (Exception e) {
15943
e.printStackTrace();
160-
worker.shutdown();
161-
} finally {
162-
// delete table
163-
System.out.println("Begin Delete Table: " + tableName);
164-
deleteTable(syncClient, tableName);
165-
System.out.println("++++++++++++++++++++++++++++++++++++");
166-
worker.shutdown();
167-
syncClient.shutdown();
168-
tunnelClient.shutdown();
44+
worker1.shutdown();
16945
}
17046
}
171-
17247
}

0 commit comments

Comments
 (0)