Skip to content

Commit 185b9a3

Browse files
committed
Add sample code for asyn interface.
1 parent a6f0c44 commit 185b9a3

File tree

1 file changed

+293
-0
lines changed

1 file changed

+293
-0
lines changed
Lines changed: 293 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,293 @@
1+
package examples;
2+
3+
import com.aliyun.openservices.ots.*;
4+
import com.aliyun.openservices.ots.internal.OTSCallback;
5+
import com.aliyun.openservices.ots.model.*;
6+
import com.aliyun.openservices.ots.model.condition.RelationalCondition;
7+
8+
import java.util.ArrayList;
9+
import java.util.List;
10+
import java.util.concurrent.atomic.AtomicBoolean;
11+
12+
public class OTSAsyncSample {
13+
14+
private static final String COLUMN_GID_NAME = "gid";
15+
private static final String COLUMN_UID_NAME = "uid";
16+
private static final String COLUMN_NAME_NAME = "name";
17+
private static final String COLUMN_AGE_NAME = "age";
18+
19+
public static void main(String args[]) {
20+
final String endPoint = "";
21+
final String accessId = "";
22+
final String accessKey = "";
23+
final String instanceName = "";
24+
25+
OTSClient client = new OTSClient(endPoint, accessId, accessKey, instanceName);
26+
OTSClientAsync asyncClient = new OTSClientAsync(endPoint, accessId, accessKey, instanceName);
27+
final String tableName = "sampleTable";
28+
29+
try{
30+
// 创建表
31+
createTable(client, tableName);
32+
33+
// 注意:创建表只是提交请求,OTS创建表需要一段时间。
34+
// 这里简单地等待2秒,请根据您的实际逻辑修改。
35+
Thread.sleep(2000);
36+
37+
listTableWithFuture(asyncClient);
38+
listTableWithCallback(asyncClient);
39+
40+
// 异步并发的执行多次batchWriteRow操作
41+
batchWriteRow(asyncClient, tableName);
42+
43+
// 异步并发的执行多次getRange操作
44+
batchGetRange(asyncClient, tableName);
45+
}catch(ServiceException e){
46+
System.err.println("操作失败,详情:" + e.getMessage());
47+
// 可以根据错误代码做出处理, OTS的ErrorCode定义在OTSErrorCode中。
48+
if (OTSErrorCode.QUOTA_EXHAUSTED.equals(e.getErrorCode())){
49+
System.err.println("超出存储配额。");
50+
}
51+
// Request ID可以用于有问题时联系客服诊断异常。
52+
System.err.println("Request ID:" + e.getRequestId());
53+
}catch(ClientException e){
54+
// 可能是网络不好或者是返回结果有问题
55+
System.err.println("请求失败,详情:" + e.getMessage());
56+
} catch (InterruptedException e) {
57+
System.err.println(e.getMessage());
58+
}
59+
finally{
60+
// 不留垃圾。
61+
try {
62+
deleteTable(client, tableName);
63+
} catch (ServiceException e) {
64+
System.err.println("删除表格失败,原因:" + e.getMessage());
65+
e.printStackTrace();
66+
} catch (ClientException e) {
67+
System.err.println("删除表格请求失败,原因:" + e.getMessage());
68+
e.printStackTrace();
69+
}
70+
client.shutdown();
71+
asyncClient.shutdown();
72+
}
73+
}
74+
75+
private static OTSFuture<GetRangeResult> sendGetRangeRequest(OTSClientAsync asyncClient, String tableName, long start, long end) {
76+
RowPrimaryKey startPk = new RowPrimaryKey();
77+
startPk.addPrimaryKeyColumn(COLUMN_GID_NAME, PrimaryKeyValue.fromLong(start));
78+
startPk.addPrimaryKeyColumn(COLUMN_UID_NAME, PrimaryKeyValue.INF_MIN);
79+
80+
RowPrimaryKey endPk = new RowPrimaryKey();
81+
endPk.addPrimaryKeyColumn(COLUMN_GID_NAME, PrimaryKeyValue.fromLong(end));
82+
endPk.addPrimaryKeyColumn(COLUMN_UID_NAME, PrimaryKeyValue.INF_MIN);
83+
84+
RangeRowQueryCriteria criteria = new RangeRowQueryCriteria(tableName);
85+
criteria.setInclusiveStartPrimaryKey(startPk);
86+
criteria.setExclusiveEndPrimaryKey(endPk);
87+
criteria.setLimit(10);
88+
89+
GetRangeRequest request = new GetRangeRequest();
90+
request.setRangeRowQueryCriteria(criteria);
91+
OTSFuture<GetRangeResult> future = asyncClient.getRange(request);
92+
return future;
93+
}
94+
95+
private static void batchGetRange(OTSClientAsync asyncClient, String tableName) {
96+
// 一次性查询多个范围的数据,设置10个任务,每个任务查询100条数据。
97+
// 每个范围查询的时候设置limit为10,100条数据需要10次请求才能全部查完。
98+
int count = 10;
99+
OTSFuture<GetRangeResult>[] futures = new OTSFuture[count];
100+
for (int i = 0; i < count; i++) {
101+
futures[i] = sendGetRangeRequest(asyncClient, tableName, i * 100, i * 100 + 100);
102+
}
103+
104+
// 检查是否所有范围查询均已做完,若未做完,则继续发送查询请求
105+
List<Row> allRows = new ArrayList<Row>();
106+
while (true) {
107+
boolean completed = true;
108+
for (int i = 0; i < futures.length; i++) {
109+
OTSFuture<GetRangeResult> future = futures[i];
110+
if (future == null) {
111+
continue;
112+
}
113+
114+
if (future.isDone()) {
115+
GetRangeResult result = future.get();
116+
allRows.addAll(result.getRows());
117+
118+
if (result.getNextStartPrimaryKey() != null) {
119+
// 该范围还未查询完毕,需要从nextStart开始继续往下读。
120+
long nextStart = result.getNextStartPrimaryKey().getPrimaryKey().get(COLUMN_GID_NAME).asLong();
121+
long rangeEnd = i * 100 + 100;
122+
futures[i] = sendGetRangeRequest(asyncClient, tableName, nextStart, rangeEnd);
123+
completed = false;
124+
} else {
125+
futures[i] = null; // 若某个范围查询完毕,则将对应future设置为null
126+
}
127+
} else {
128+
completed = false;
129+
}
130+
}
131+
132+
if (completed) {
133+
break;
134+
} else {
135+
try {
136+
Thread.sleep(10); // 避免busy wait,每次循环完毕后等待一小段时间
137+
} catch (InterruptedException e) {
138+
e.printStackTrace();
139+
}
140+
}
141+
}
142+
143+
// 所有数据全部读出
144+
System.out.println("Total rows scanned: " + allRows.size());
145+
}
146+
147+
private static void batchWriteRow(OTSClientAsync asyncClient, String tableName) {
148+
// BatchWriteRow的行数限制是100行,使用异步接口,实现一次批量导入1000行。
149+
List<OTSFuture<BatchWriteRowResult>> futures = new ArrayList<OTSFuture<BatchWriteRowResult>>();
150+
int count = 10;
151+
// 一次性发出10个请求,每个请求写100行数据
152+
for (int i = 0; i < count; i++) {
153+
BatchWriteRowRequest request = new BatchWriteRowRequest();
154+
for (int j = 0; j < 100; j++) {
155+
RowPutChange rowChange = new RowPutChange(tableName);
156+
RowPrimaryKey primaryKey = new RowPrimaryKey();
157+
primaryKey.addPrimaryKeyColumn(COLUMN_GID_NAME, PrimaryKeyValue.fromLong(i * 100 + j));
158+
primaryKey.addPrimaryKeyColumn(COLUMN_UID_NAME, PrimaryKeyValue.fromLong(j));
159+
rowChange.setPrimaryKey(primaryKey);
160+
rowChange.addAttributeColumn(COLUMN_NAME_NAME, ColumnValue.fromString("name" + j));
161+
rowChange.addAttributeColumn(COLUMN_AGE_NAME, ColumnValue.fromLong(j));
162+
163+
request.addRowChange(rowChange);
164+
}
165+
OTSFuture<BatchWriteRowResult> result = asyncClient.batchWriteRow(request);
166+
futures.add(result);
167+
}
168+
169+
// 等待结果返回
170+
List<BatchWriteRowResult> results = new ArrayList<BatchWriteRowResult>();
171+
for (OTSFuture<BatchWriteRowResult> future : futures) {
172+
try {
173+
BatchWriteRowResult result = future.get(); // 同步等待结果返回
174+
results.add(result);
175+
} catch (OTSException e) {
176+
e.printStackTrace();
177+
} catch (ClientException e) {
178+
e.printStackTrace();
179+
}
180+
}
181+
182+
// 统计返回结果
183+
int totalSucceedRows = 0;
184+
int totalFailedRows = 0;
185+
for (BatchWriteRowResult result : results) {
186+
totalSucceedRows += result.getSucceedRowsOfPut().size();
187+
totalFailedRows += result.getFailedRowsOfPut().size();
188+
}
189+
190+
System.out.println("Total succeed rows: " + totalSucceedRows);
191+
System.out.println("Total failed rows: " + totalFailedRows);
192+
}
193+
194+
private static void listTableWithCallback(OTSClientAsync asyncClient) {
195+
final AtomicBoolean isDone = new AtomicBoolean(false);
196+
OTSCallback<ListTableRequest, ListTableResult> callback = new OTSCallback<ListTableRequest, ListTableResult>() {
197+
@Override
198+
public void onCompleted(OTSContext<ListTableRequest, ListTableResult> otsContext) {
199+
isDone.set(true);
200+
System.out.println("\nList table by listTableWithCallback:");
201+
for (String tableName : otsContext.getOTSResult().getTableNames()) {
202+
System.out.println(tableName);
203+
}
204+
}
205+
206+
@Override
207+
public void onFailed(OTSContext<ListTableRequest, ListTableResult> otsContext, OTSException ex) {
208+
isDone.set(true);
209+
ex.printStackTrace();
210+
}
211+
212+
@Override
213+
public void onFailed(OTSContext<ListTableRequest, ListTableResult> otsContext, ClientException ex) {
214+
isDone.set(true);
215+
ex.printStackTrace();
216+
}
217+
};
218+
219+
asyncClient.listTable(callback); // 将callback扔给SDK,SDK在完成请求接到响应后,会自动调用callback
220+
221+
// 等待callback被调用,一般的业务处理逻辑下,不需要这一步等待。
222+
while (!isDone.get()) {
223+
try {
224+
Thread.sleep(10);
225+
} catch (InterruptedException e) {
226+
e.printStackTrace();
227+
}
228+
}
229+
}
230+
231+
private static void listTableWithFuture(OTSClientAsync client) {
232+
// 通过Future同步的等待结果返回。
233+
try {
234+
OTSFuture<ListTableResult> future = client.listTable();
235+
ListTableResult result = future.get(); // 同步的等待
236+
System.out.println("\nList table by listTableWithFuture:");
237+
for (String tableName : result.getTableNames()) {
238+
System.out.println(tableName);
239+
}
240+
} catch (OTSException e) {
241+
e.printStackTrace();
242+
} catch (ClientException e) {
243+
e.printStackTrace();
244+
}
245+
246+
// 通过Future,间歇性的等待结果返回。
247+
try {
248+
OTSFuture<ListTableResult> future = client.listTable();
249+
250+
while (!future.isDone()) {
251+
System.out.println("Waiting for result of list table.");
252+
Thread.sleep(10); // 每隔10ms检查结果是否返回
253+
}
254+
255+
ListTableResult result = future.get();
256+
System.out.println("\nList table by listTableWithFuture:");
257+
for (String tableName : result.getTableNames()) {
258+
System.out.println(tableName);
259+
}
260+
} catch (OTSException e) {
261+
e.printStackTrace();
262+
} catch (ClientException e) {
263+
e.printStackTrace();
264+
} catch (InterruptedException e) {
265+
e.printStackTrace();
266+
}
267+
}
268+
269+
private static void createTable(OTSClient client, String tableName)
270+
throws ServiceException, ClientException{
271+
TableMeta tableMeta = new TableMeta(tableName);
272+
tableMeta.addPrimaryKeyColumn(COLUMN_GID_NAME, PrimaryKeyType.INTEGER);
273+
tableMeta.addPrimaryKeyColumn(COLUMN_UID_NAME, PrimaryKeyType.INTEGER);
274+
// 将该表的读写CU都设置为0
275+
CapacityUnit capacityUnit = new CapacityUnit(0, 0);
276+
277+
CreateTableRequest request = new CreateTableRequest();
278+
request.setTableMeta(tableMeta);
279+
request.setReservedThroughput(capacityUnit);
280+
client.createTable(request);
281+
282+
System.out.println("表已创建");
283+
}
284+
285+
private static void deleteTable(OTSClient client, String tableName)
286+
throws ServiceException, ClientException{
287+
DeleteTableRequest request = new DeleteTableRequest();
288+
request.setTableName(tableName);
289+
client.deleteTable(request);
290+
291+
System.out.println("表已删除");
292+
}
293+
}

0 commit comments

Comments
 (0)