Skip to content

Commit d25ceac

Browse files
committed
1. Add sample of ots writer.
2. Set reserved throught to 0 of all the table created in sample code. 3. Fix bug: ots writer won't exit as a executor thread pool is not shut down inside.
1 parent 7a049cb commit d25ceac

File tree

7 files changed

+243
-13
lines changed

7 files changed

+243
-13
lines changed

src/main/java/com/aliyun/openservices/ots/DefaultOTSWriter.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,8 @@ public class DefaultOTSWriter implements OTSWriter {
4343

4444
private RowChangeEventHandler eventHandler;
4545

46+
private ExecutorService disruptorExecutor;
47+
4648
public DefaultOTSWriter(OTSAsync ots, String tableName, WriterConfig config, OTSCallback<RowChange, ConsumedCapacity> callback, Executor executor) {
4749
Preconditions.checkNotNull(ots, "The ots client can not be null.");
4850
Preconditions.checkArgument(tableName != null && !tableName.isEmpty(), "The table name can not be null or empty.");
@@ -70,7 +72,8 @@ private void initialize() {
7072
RowChangeEvent.RowChangeEventFactory factory = new RowChangeEvent.RowChangeEventFactory();
7173

7274
// start flush thread, we only need one event handler, so we just set a thread pool with fixed size 1.
73-
disruptor = new Disruptor<RowChangeEvent>(factory, writerConfig.getBufferSize(), Executors.newFixedThreadPool(1));
75+
disruptorExecutor = Executors.newFixedThreadPool(1);
76+
disruptor = new Disruptor<RowChangeEvent>(factory, writerConfig.getBufferSize(), disruptorExecutor);
7477
ringBuffer = disruptor.getRingBuffer();
7578
eventHandler = new RowChangeEventHandler(ots, writerConfig, callback, executor);
7679
disruptor.handleEventsWith(eventHandler);
@@ -190,5 +193,6 @@ public synchronized void close() {
190193
flushTimer.cancel();
191194
flush();
192195
disruptor.shutdown();
196+
disruptorExecutor.shutdown();
193197
}
194198
}

src/test/java/examples/OTSConditionalUpdateSample.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -100,8 +100,8 @@ private static void createTable(OTSClient client, String tableName)
100100
TableMeta tableMeta = new TableMeta(tableName);
101101
tableMeta.addPrimaryKeyColumn(COLUMN_GID_NAME, PrimaryKeyType.INTEGER);
102102
tableMeta.addPrimaryKeyColumn(COLUMN_UID_NAME, PrimaryKeyType.INTEGER);
103-
// 将该表的读写CU都设置为100
104-
CapacityUnit capacityUnit = new CapacityUnit(100, 100);
103+
// 将该表的读写CU都设置为0
104+
CapacityUnit capacityUnit = new CapacityUnit(0, 0);
105105

106106
CreateTableRequest request = new CreateTableRequest();
107107
request.setTableMeta(tableMeta);

src/test/java/examples/OTSMultiDataSample.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -77,8 +77,8 @@ private static void createTable(OTSClient client, String tableName)
7777
TableMeta tableMeta = new TableMeta(tableName);
7878
tableMeta.addPrimaryKeyColumn(COLUMN_GID_NAME, PrimaryKeyType.INTEGER);
7979
tableMeta.addPrimaryKeyColumn(COLUMN_UID_NAME, PrimaryKeyType.INTEGER);
80-
// 将该表的读写CU都设置为100
81-
CapacityUnit capacityUnit = new CapacityUnit(100, 100);
80+
// 将该表的读写CU都设置为0
81+
CapacityUnit capacityUnit = new CapacityUnit(0, 0);
8282

8383
CreateTableRequest request = new CreateTableRequest();
8484
request.setTableMeta(tableMeta);

src/test/java/examples/OTSMultiTableOperationSample.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -94,8 +94,8 @@ private static void createTable(OTSClient client, String tableName)
9494
TableMeta tableMeta = new TableMeta(tableName);
9595
tableMeta.addPrimaryKeyColumn(COLUMN_UID_NAME, PrimaryKeyType.INTEGER);
9696
tableMeta.addPrimaryKeyColumn(COLUMN_GID_NAME, PrimaryKeyType.STRING);
97-
// 将该表的读写CU都设置为100
98-
CapacityUnit capacityUnit = new CapacityUnit(100, 100);
97+
// 将该表的读写CU都设置为0
98+
CapacityUnit capacityUnit = new CapacityUnit(0, 0);
9999

100100
CreateTableRequest request = new CreateTableRequest();
101101
request.setTableMeta(tableMeta);

src/test/java/examples/OTSSingleDataSample.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -104,8 +104,8 @@ private static void createTable(OTSClient client, String tableName)
104104
TableMeta tableMeta = new TableMeta(tableName);
105105
tableMeta.addPrimaryKeyColumn(COLUMN_GID_NAME, PrimaryKeyType.INTEGER);
106106
tableMeta.addPrimaryKeyColumn(COLUMN_UID_NAME, PrimaryKeyType.INTEGER);
107-
// 将该表的读写CU都设置为100
108-
CapacityUnit capacityUnit = new CapacityUnit(100, 100);
107+
// 将该表的读写CU都设置为0
108+
CapacityUnit capacityUnit = new CapacityUnit(0, 0);
109109

110110
CreateTableRequest request = new CreateTableRequest();
111111
request.setTableMeta(tableMeta);

src/test/java/examples/OTSTableOperationSample.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -73,8 +73,8 @@ private static void createTable(OTSClient client, String tableName)
7373
TableMeta tableMeta = new TableMeta(tableName);
7474
tableMeta.addPrimaryKeyColumn(COLUMN_GID_NAME, PrimaryKeyType.INTEGER);
7575
tableMeta.addPrimaryKeyColumn(COLUMN_UID_NAME, PrimaryKeyType.INTEGER);
76-
// 将该表的读写CU都设置为100
77-
CapacityUnit capacityUnit = new CapacityUnit(100, 100);
76+
// 将该表的读写CU都设置为0
77+
CapacityUnit capacityUnit = new CapacityUnit(0, 0);
7878

7979
CreateTableRequest request = new CreateTableRequest();
8080
request.setTableMeta(tableMeta);
@@ -114,8 +114,8 @@ private static void updateTable(OTSClient client, String tableName)
114114
UpdateTableRequest request = new UpdateTableRequest();
115115
request.setTableName(tableName);
116116
ReservedThroughputChange cuChange = new ReservedThroughputChange();
117-
cuChange.setReadCapacityUnit(50); // 若想单独调整写CU,则无须设置读CU
118-
cuChange.setWriteCapacityUnit(50); // 若想单独调整读CU,则无须设置写CU
117+
cuChange.setReadCapacityUnit(1); // 若想单独调整写CU,则无须设置读CU
118+
cuChange.setWriteCapacityUnit(1); // 若想单独调整读CU,则无须设置写CU
119119
request.setReservedThroughputChange(cuChange);
120120
UpdateTableResult result = client.updateTable(request);
121121

Lines changed: 226 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,226 @@
1+
package examples;
2+
3+
import com.aliyun.openservices.ots.*;
4+
import com.aliyun.openservices.ots.internal.OTSAlwaysRetryStrategy;
5+
import com.aliyun.openservices.ots.internal.OTSCallback;
6+
import com.aliyun.openservices.ots.internal.OTSDefaultRetryStrategy;
7+
import com.aliyun.openservices.ots.internal.writer.WriterConfig;
8+
import com.aliyun.openservices.ots.model.*;
9+
10+
import java.util.ArrayList;
11+
import java.util.Iterator;
12+
import java.util.List;
13+
import java.util.concurrent.Executor;
14+
import java.util.concurrent.ExecutorService;
15+
import java.util.concurrent.Executors;
16+
import java.util.concurrent.TimeUnit;
17+
import java.util.concurrent.atomic.AtomicLong;
18+
19+
public class OTSWriterSample {
20+
private static final String COLUMN_GID_NAME = "gid";
21+
private static final String COLUMN_UID_NAME = "uid";
22+
final private static String endPoint = "";
23+
final private static String accessId = "";
24+
final private static String accessKey = "";
25+
final private static String instanceName = "";
26+
27+
28+
public static void main(String args[]) {
29+
30+
OTSClient client = new OTSClient(endPoint, accessId, accessKey,
31+
instanceName);
32+
final String tableName = "sampleTable";
33+
34+
try {
35+
// 创建表
36+
createTable(client, tableName);
37+
38+
writeRow(tableName);
39+
40+
scanTable(client, tableName);
41+
42+
deleteTable(client, tableName);
43+
44+
} catch (ServiceException e) {
45+
System.err.println("操作失败,详情:" + e.getMessage());
46+
// 可以根据错误代码做出处理, OTS的ErrorCode定义在OTSErrorCode中。
47+
if (OTSErrorCode.QUOTA_EXHAUSTED.equals(e.getErrorCode())) {
48+
System.err.println("超出存储配额。");
49+
}
50+
// Request ID可以用于有问题时联系客服诊断异常。
51+
System.err.println("Request ID:" + e.getRequestId());
52+
} catch (ClientException e) {
53+
// 可能是网络不好或者是返回结果有问题
54+
System.err.println("请求失败,详情:" + e.getMessage());
55+
} catch (InterruptedException e) {
56+
System.err.print(e);
57+
}
58+
client.shutdown();
59+
}
60+
61+
private static void scanTable(OTSClient client, String tableName) {
62+
RangeIteratorParameter param = new RangeIteratorParameter(tableName);
63+
RowPrimaryKey startKey = new RowPrimaryKey();
64+
startKey.addPrimaryKeyColumn("uid", PrimaryKeyValue.INF_MIN);
65+
startKey.addPrimaryKeyColumn("gid", PrimaryKeyValue.INF_MIN);
66+
67+
RowPrimaryKey endKey = new RowPrimaryKey();
68+
endKey.addPrimaryKeyColumn("uid", PrimaryKeyValue.INF_MAX);
69+
endKey.addPrimaryKeyColumn("gid", PrimaryKeyValue.INF_MAX);
70+
71+
param.setInclusiveStartPrimaryKey(startKey);
72+
param.setExclusiveEndPrimaryKey(endKey);
73+
74+
Iterator<Row> rowIter = client.createRangeIterator(param);
75+
int totalCount = 0;
76+
while (rowIter.hasNext()) {
77+
rowIter.next();
78+
totalCount++;
79+
}
80+
81+
System.out.println("TotalRows in table: " + totalCount);
82+
}
83+
84+
private static void createTable(OTSClient client, String tableName)
85+
throws ServiceException, ClientException {
86+
TableMeta tableMeta = new TableMeta(tableName);
87+
tableMeta.addPrimaryKeyColumn(COLUMN_GID_NAME, PrimaryKeyType.INTEGER);
88+
tableMeta.addPrimaryKeyColumn(COLUMN_UID_NAME, PrimaryKeyType.INTEGER);
89+
// 将该表的读写CU都设置为100
90+
CapacityUnit capacityUnit = new CapacityUnit(0, 0);
91+
92+
CreateTableRequest request = new CreateTableRequest();
93+
request.setTableMeta(tableMeta);
94+
request.setReservedThroughput(capacityUnit);
95+
client.createTable(request);
96+
97+
System.out.println("表已创建");
98+
}
99+
100+
private static void deleteTable(OTSClient client, String tableName)
101+
throws ServiceException, ClientException {
102+
DeleteTableRequest request = new DeleteTableRequest();
103+
request.setTableName(tableName);
104+
client.deleteTable(request);
105+
106+
System.out.println("表已删除");
107+
}
108+
109+
private static void writeRow(String tableName) throws InterruptedException {
110+
ClientConfiguration cc = new ClientConfiguration();
111+
OTSServiceConfiguration osc = new OTSServiceConfiguration();
112+
osc.setRetryStrategy(new OTSDefaultRetryStrategy()); // 可定制重试策略,若需要保证数据写入成功率,可采用更激进的重试策略
113+
OTSClientAsync asyncClient = new OTSClientAsync(endPoint, accessId, accessKey, instanceName, cc, osc);
114+
115+
// 初始化
116+
WriterConfig config = new WriterConfig();
117+
config.setMaxBatchSize(1024 * 1024); // 配置一次批量导入请求的大小限制,默认是1MB
118+
config.setMaxColumnsCount(128); // 配置一行的列数的上限,默认128列
119+
config.setBufferSize(1024); // 配置内存中最多缓冲的数据行数,默认1024行,必须是2的指数倍
120+
config.setMaxBatchRowsCount(100); // 配置一次批量导入的行数上限,默认100
121+
config.setConcurrency(10); // 配置最大并发数,默认10
122+
config.setMaxAttrColumnSize(64 * 1024); // 配置属性列的值大小上限,默认是64KB
123+
config.setMaxPKColumnSize(1024); // 配置主键列的值大小上限,默认1KB
124+
config.setFlushInterval(10000); // 配置缓冲区flush的时间间隔,默认10s
125+
126+
// 配置一个callback,OTSWriter通过该callback反馈哪些导入成功,哪些行导入失败,该callback只简单的统计写入成功和失败的行数。
127+
128+
AtomicLong succeedCount = new AtomicLong();
129+
AtomicLong failedCount = new AtomicLong();
130+
OTSCallback<RowChange, ConsumedCapacity> callback = new SampleCallback(succeedCount, failedCount);
131+
ExecutorService executor = Executors.newFixedThreadPool(2);
132+
OTSWriter otsWriter = new DefaultOTSWriter(asyncClient, tableName, config, callback, executor);
133+
134+
// 起多个线程,并发的导入数据
135+
int threadCount = 10;
136+
int rowsCount = 10000;
137+
List<Thread> threads = new ArrayList<Thread>();
138+
for (int i = 0; i < threadCount; i++) {
139+
WriteRow writeRow = new WriteRow(i, tableName, otsWriter, rowsCount);
140+
threads.add(new Thread(writeRow));
141+
}
142+
143+
for (Thread thread : threads) {
144+
thread.start();
145+
}
146+
147+
for (Thread thread : threads) {
148+
thread.join();
149+
}
150+
151+
// 所有压力线程完成后,等待OTSWriter将数据导入完毕
152+
otsWriter.flush();
153+
154+
// 所有数据导入完毕后,需要显式的将OTSWriter给close掉
155+
otsWriter.close();
156+
157+
// 最终关闭ots client
158+
asyncClient.shutdown();
159+
160+
// 关闭executor thread pool
161+
executor.shutdown();
162+
executor.awaitTermination(10, TimeUnit.SECONDS);
163+
164+
System.out.println("成功导入行数: " + succeedCount);
165+
System.out.println("失败导入行数: " + failedCount);
166+
}
167+
168+
private static class WriteRow implements Runnable {
169+
private int id;
170+
private String tableName;
171+
private OTSWriter writer;
172+
private int rowsCount;
173+
174+
public WriteRow(int id, String tableName, OTSWriter writer, int rowsCount) {
175+
this.id = id;
176+
this.tableName = tableName;
177+
this.writer = writer;
178+
this.rowsCount = rowsCount;
179+
}
180+
181+
@Override
182+
public void run() {
183+
int start = id * rowsCount;
184+
for (int i = 0; i < rowsCount; i++) {
185+
RowPrimaryKey primaryKey = new RowPrimaryKey();
186+
primaryKey.addPrimaryKeyColumn("gid", PrimaryKeyValue.fromLong(start + i));
187+
primaryKey.addPrimaryKeyColumn("uid", PrimaryKeyValue.fromLong(start + i));
188+
189+
RowPutChange rowChange = new RowPutChange(tableName);
190+
rowChange.setPrimaryKey(primaryKey);
191+
rowChange.addAttributeColumn("col1", ColumnValue.fromBoolean(true));
192+
rowChange.addAttributeColumn("col2", ColumnValue.fromLong(10));
193+
rowChange.addAttributeColumn("col3", ColumnValue.fromString("Hello world."));
194+
195+
writer.addRowChange(rowChange);
196+
}
197+
}
198+
}
199+
200+
private static class SampleCallback implements OTSCallback<RowChange, ConsumedCapacity> {
201+
private AtomicLong succeedCount;
202+
private AtomicLong failedCount;
203+
204+
public SampleCallback(AtomicLong succeedCount, AtomicLong failedCount) {
205+
this.succeedCount = succeedCount;
206+
this.failedCount = failedCount;
207+
}
208+
209+
@Override
210+
public void onCompleted(OTSContext<RowChange, ConsumedCapacity> otsContext) {
211+
succeedCount.incrementAndGet();
212+
}
213+
214+
@Override
215+
public void onFailed(OTSContext<RowChange, ConsumedCapacity> otsContext, OTSException ex) {
216+
ex.printStackTrace();
217+
failedCount.incrementAndGet();
218+
}
219+
220+
@Override
221+
public void onFailed(OTSContext<RowChange, ConsumedCapacity> otsContext, ClientException ex) {
222+
ex.printStackTrace();
223+
failedCount.incrementAndGet();
224+
}
225+
}
226+
}

0 commit comments

Comments
 (0)