Skip to content

Commit ab76929

Browse files
杨恋杨恋
authored andcommitted
add timestream
1 parent 9ecf499 commit ab76929

File tree

93 files changed

+10444
-1
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

93 files changed

+10444
-1
lines changed

pom.xml

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
<modelVersion>4.0.0</modelVersion>
66
<groupId>com.aliyun.openservices</groupId>
77
<artifactId>tablestore</artifactId>
8-
<version>4.10.2-SNAPSHOT</version>
8+
<version>4.11.0-SNAPSHOT</version>
99
<packaging>jar</packaging>
1010
<name>AliCloud TableStore SDK for Java</name>
1111
<url>http://www.aliyun.com</url>
@@ -86,6 +86,11 @@
8686
<artifactId>gson</artifactId>
8787
<version>2.8.5</version>
8888
</dependency>
89+
<dependency>
90+
<groupId>com.google.guava</groupId>
91+
<artifactId>guava</artifactId>
92+
<version>27.0.1-jre</version>
93+
</dependency>
8994
</dependencies>
9095
<build>
9196
<plugins>
@@ -184,6 +189,7 @@
184189
<exclude>commons-codec:commons-codec:jar:</exclude>
185190
<exclude>org.hamcrest:hamcrest-core:jar:</exclude>
186191
<exclude>com.google.code.gson:gson:jar:</exclude>
192+
<exclude>com.google.guava:guava:jar:</exclude>
187193
</excludes>
188194
</artifactSet>
189195
<relocations>
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
package com.alicloud.openservices.tablestore.timestream;
2+
3+
import com.alicloud.openservices.tablestore.timestream.model.AttributeIndexSchema;
4+
5+
import java.util.List;
6+
7+
public interface TimestreamDB {
8+
9+
/**
10+
* 关闭client,释放资源
11+
* <p>请确保在所有请求执行完毕之后释放资源。释放资源之后将不能再发送请求,正在执行的请求可能无法返回结果。</p>
12+
*/
13+
public void close();
14+
15+
/**
16+
* 创建meta表,不为attributes创建索引
17+
*/
18+
public void createMetaTable();
19+
20+
/**
21+
* 创建meta表,为指定的attributes创建索引
22+
* <p>attribute不能为保留字段(h、n、t、s)</p>
23+
*/
24+
public void createMetaTable(List<AttributeIndexSchema> indexForAttributes);
25+
26+
/**
27+
* 删除meta表
28+
*/
29+
public void deleteMetaTable();
30+
31+
/**
32+
* 创建数据表
33+
* @param tableName 数据表表名
34+
*/
35+
public void createDataTable(String tableName);
36+
37+
/**
38+
* 删除数据表
39+
* @param tableName 数据表表名
40+
*/
41+
public void deleteDataTable(String tableName);
42+
43+
/**
44+
* 获取meta表的操作对象
45+
* @return
46+
*/
47+
public TimestreamMetaTable metaTable();
48+
49+
/**
50+
* 获取数据表的操作对象
51+
* @param tableName 数据表表名
52+
* @return
53+
*/
54+
public TimestreamDataTable dataTable(String tableName);
55+
}
Lines changed: 306 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,306 @@
1+
package com.alicloud.openservices.tablestore.timestream;
2+
3+
import com.alicloud.openservices.tablestore.*;
4+
import com.alicloud.openservices.tablestore.model.*;
5+
import com.alicloud.openservices.tablestore.model.search.*;
6+
import com.alicloud.openservices.tablestore.writer.WriterConfig;
7+
import com.alicloud.openservices.tablestore.timestream.internal.MetaCacheManager;
8+
import com.alicloud.openservices.tablestore.timestream.internal.TableMetaGenerator;
9+
import com.alicloud.openservices.tablestore.timestream.internal.Utils;
10+
import com.alicloud.openservices.tablestore.timestream.model.AttributeIndexSchema;
11+
import org.slf4j.Logger;
12+
import org.slf4j.LoggerFactory;
13+
14+
import java.util.*;
15+
import java.util.concurrent.*;
16+
import java.util.concurrent.atomic.AtomicBoolean;
17+
18+
/**
19+
* {@link TimestreamDBClient}定义,提供删建表,以及数据读写功能。
20+
* <p>后台默认打开自动更新时间线updateTime功能,可以通过{@link TimestreamDBConfiguration#enableDumpMeta}选择关闭 </p>
21+
* <p>当后台打开自动更新时间线updateTime功能时({@link TimestreamDBConfiguration#enableDumpMeta}),TimestreamDBClient后台会维护一个内存缓存最近更新过的时间线。数据写入时,会判断该时间线是否需要更新updateTime(缓存中没有或者上次更新时间线超过设置的阈值),
22+
* 如果需要更新则往meta表中插入一条记录(只更新updateTime)。</p>
23+
* <p>数据写入的异步api是通过{@link TableStoreWriter}来实现的,如果需要获取异步写入的结果,可以传入{@link TableStoreCallback},该callback是所有数据表写入共用的</p>
24+
*/
25+
public class TimestreamDBClient implements TimestreamDB {
26+
private Logger logger = LoggerFactory.getLogger(TimestreamDBClient.class);
27+
28+
private TimestreamDBConfiguration config;
29+
private WriterConfig writerConfig;
30+
31+
private String metaTableName;
32+
private String indexName;
33+
34+
private ExecutorService executor;
35+
36+
private AsyncClient asyncClient;
37+
private TableStoreWriter metaWriter;
38+
private MetaCacheManager metaCacheManager;
39+
private Map<String, TimestreamDataTable> dataTableMap = new HashMap<String, TimestreamDataTable>();
40+
private TableStoreCallback<RowChange, ConsumedCapacity> callback;
41+
private AtomicBoolean closed = new AtomicBoolean(false);
42+
private boolean writeMeta;
43+
44+
/**
45+
* TimestreamDBClient的构造函数
46+
* @param asyncClient TableStore异步client
47+
* @param config client配置
48+
*/
49+
public TimestreamDBClient(AsyncClient asyncClient, TimestreamDBConfiguration config) {
50+
this(asyncClient, config, new WriterConfig(), null);
51+
}
52+
53+
/**
54+
* TimestreamDBClient的构造函数
55+
* @param asyncClient TableStore异步client
56+
* @param config client配置
57+
* @param writerConfig 所有数据表使用的TableStoreWriter的{@link WriterConfig}
58+
* @param callback 所有数据表使用的TableStoreWriter共用的{@link TableStoreCallback}
59+
*/
60+
public TimestreamDBClient(
61+
AsyncClient asyncClient,
62+
TimestreamDBConfiguration config,
63+
WriterConfig writerConfig,
64+
TableStoreCallback<RowChange, ConsumedCapacity> callback) {
65+
this.asyncClient = asyncClient;
66+
this.config = config;
67+
this.writerConfig = writerConfig;
68+
this.callback = callback;
69+
this.metaTableName = this.config.getMetaTableName();
70+
this.indexName = this.metaTableName + "_INDEX";
71+
this.writeMeta = config.getDumpMeta();
72+
73+
executor = Executors.newFixedThreadPool(this.config.getThreadNumForWriter());
74+
try {
75+
tryInitMetaWriter();
76+
} catch (TableStoreException e) {
77+
logger.warn("Failed to init meta writer:" + e.getMessage());
78+
} catch (ClientException e) {
79+
logger.warn("Failed to init meta writer:" + e.toString());
80+
}
81+
logger.info("End initialize client");
82+
}
83+
84+
@Override
85+
public synchronized void close() {
86+
if (closed.get()) {
87+
throw new ClientException("The client has already been closed.");
88+
}
89+
if (this.metaCacheManager != null) {
90+
this.metaCacheManager.close();
91+
}
92+
if (this.metaWriter != null) {
93+
this.metaWriter.close();
94+
}
95+
for (TimestreamDataTable dataTable : this.dataTableMap.values()) {
96+
dataTable.close();
97+
}
98+
this.asyncClient.shutdown();
99+
this.executor.shutdown();
100+
closed.set(true);
101+
}
102+
103+
private void tryInitMetaWriter() {
104+
if (!this.writeMeta) {
105+
return;
106+
}
107+
if (this.metaWriter == null) {
108+
synchronized (this) {
109+
if (this.metaWriter == null) {
110+
logger.info("Begin to init meta writer");
111+
checkMetaTableExist();
112+
checkIndexMetaExist();
113+
this.metaWriter = new DefaultTableStoreWriter(
114+
this.asyncClient,
115+
this.metaTableName,
116+
this.writerConfig, null,
117+
executor);
118+
this.metaCacheManager = new MetaCacheManager(
119+
this.metaTableName,
120+
this.config.getIntervalDumpMeta(TimeUnit.SECONDS),
121+
this.config.getMetaCacheSize(),
122+
this.metaWriter);
123+
logger.info("End to init meta writer");
124+
}
125+
}
126+
}
127+
}
128+
129+
@Override
130+
public void createMetaTable() {
131+
this.createMetaTable(null);
132+
}
133+
134+
@Override
135+
public void createMetaTable(List<AttributeIndexSchema> indexForAttributes) {
136+
if (indexForAttributes != null) {
137+
for (AttributeIndexSchema schema : indexForAttributes) {
138+
String name = schema.getFieldName();
139+
if (name.equals(TableMetaGenerator.CN_PK0) ||
140+
name.equals(TableMetaGenerator.CN_PK1) ||
141+
name.equals(TableMetaGenerator.CN_PK2) ||
142+
name.equals(TableMetaGenerator.CN_TAMESTAMP_NAME)) {
143+
throw new ClientException("Name of attribute for indexes cannot be " +
144+
TableMetaGenerator.CN_PK0 + "/" +
145+
TableMetaGenerator.CN_PK1 + "/" +
146+
TableMetaGenerator.CN_PK2 + "/" +
147+
TableMetaGenerator.CN_TAMESTAMP_NAME + ".");
148+
}
149+
}
150+
}
151+
TableMeta tableMeta = TableMetaGenerator.getMetaTableMeta(this.metaTableName);
152+
TableOptions tableOptions = new TableOptions();
153+
CreateTableRequest request = new CreateTableRequest(
154+
tableMeta, tableOptions);
155+
tableOptions.setMaxVersions(1);
156+
tableOptions.setTimeToLive(-1);
157+
Future<CreateTableResponse> res = this.asyncClient.createTable(request, null);
158+
Utils.waitForFuture(res);
159+
createSearchIndexForMeta(indexForAttributes);
160+
161+
tryInitMetaWriter();
162+
}
163+
164+
private void createSearchIndexForMeta(List<AttributeIndexSchema> indexForAttributes) {
165+
CreateSearchIndexRequest request = new CreateSearchIndexRequest(
166+
this.metaTableName, // 设置表名
167+
this.indexName); // 设置索引名
168+
IndexSchema indexSchema = getIndexSchema(indexForAttributes);
169+
// set name as index routing key
170+
IndexSetting indexSetting = new IndexSetting();
171+
indexSetting.setRoutingFields(Arrays.asList(TableMetaGenerator.CN_PK1));
172+
indexSchema.setIndexSetting(indexSetting);
173+
request.setIndexSchema(indexSchema);
174+
175+
Future<CreateSearchIndexResponse> future = this.asyncClient.createSearchIndex(request, null);
176+
Utils.waitForFuture(future);
177+
}
178+
179+
private IndexSchema getIndexSchema(List<AttributeIndexSchema> indexForAttributes) {
180+
IndexSchema indexSchema = new IndexSchema();
181+
List<FieldSchema> fieldSchemas = new ArrayList<FieldSchema>();
182+
if (indexForAttributes != null) {
183+
for (AttributeIndexSchema schema : indexForAttributes) {
184+
fieldSchemas.add(schema.getFieldSchema());
185+
}
186+
}
187+
fieldSchemas.add(
188+
new FieldSchema(TableMetaGenerator.CN_PK0, FieldType.KEYWORD));
189+
fieldSchemas.add(
190+
new FieldSchema(TableMetaGenerator.CN_PK1, FieldType.KEYWORD).setIndex(true));
191+
fieldSchemas.add(
192+
new FieldSchema(TableMetaGenerator.CN_PK2, FieldType.KEYWORD).setIndex(true).setIsArray(true));
193+
fieldSchemas.add(
194+
new FieldSchema(TableMetaGenerator.CN_TAMESTAMP_NAME, FieldType.LONG).setIndex(true).setStore(true));
195+
indexSchema.setFieldSchemas(fieldSchemas);
196+
return indexSchema;
197+
}
198+
199+
@Override
200+
public void deleteMetaTable() {
201+
deleteSearchIndexForMeta();
202+
DeleteTableRequest request = new DeleteTableRequest(this.metaTableName);
203+
Future<DeleteTableResponse> res = this.asyncClient.deleteTable(request, null);
204+
Utils.waitForFuture(res);
205+
}
206+
207+
private void deleteSearchIndexForMeta() {
208+
DeleteSearchIndexRequest request = new DeleteSearchIndexRequest();
209+
request.setTableName(this.metaTableName);
210+
request.setIndexName(indexName);
211+
Future<DeleteSearchIndexResponse> future = this.asyncClient.deleteSearchIndex(request, null);
212+
Utils.waitForFuture(future);
213+
}
214+
215+
private void checkMetaTableExist() {
216+
DescribeTableRequest request = new DescribeTableRequest(this.metaTableName);
217+
DescribeTableResponse resp = Utils.waitForFuture(this.asyncClient.describeTable(request, null));
218+
TableMeta tableMeta = resp.getTableMeta();
219+
TableMeta tableMetaExpect = TableMetaGenerator.getMetaTableMeta(this.metaTableName);
220+
List<PrimaryKeySchema> pks = tableMeta.getPrimaryKeyList();
221+
List<PrimaryKeySchema> pksExpect = tableMetaExpect.getPrimaryKeyList();
222+
if (pks.size() != pksExpect.size()) {
223+
throw new ClientException("Same table with different meta exist: " + this.metaTableName);
224+
}
225+
for (int i =0; i < pks.size(); ++i) {
226+
if (!pks.get(i).equals(pksExpect.get(i))) {
227+
throw new ClientException("Same table with different meta exist: " + this.metaTableName);
228+
}
229+
}
230+
}
231+
232+
private void checkIndexMetaExist() {
233+
ListSearchIndexRequest request = new ListSearchIndexRequest();
234+
request.setTableName(this.metaTableName);
235+
ListSearchIndexResponse resp = Utils.waitForFuture(this.asyncClient.listSearchIndex(request, null));
236+
List<SearchIndexInfo> indexInfos = resp.getIndexInfos();
237+
if (indexInfos.size() == 0) {
238+
throw new ClientException(String.format("Index for meta(%s) not exist: %s", this.metaTableName, this.indexName));
239+
}
240+
for (SearchIndexInfo indexInfo : indexInfos) {
241+
if (indexInfo.getIndexName().equals(this.indexName)) {
242+
return;
243+
}
244+
}
245+
throw new ClientException(String.format("Index for meta(%s) not exist: %s", this.metaTableName, this.indexName));
246+
}
247+
248+
@Override
249+
public void createDataTable(String tableName) {
250+
TableMeta tableMeta = TableMetaGenerator.getDataTableMeta(tableName);
251+
TableOptions tableOptions = new TableOptions();
252+
tableOptions.setMaxVersions(1);
253+
tableOptions.setTimeToLive(-1);
254+
CreateTableRequest request = new CreateTableRequest(
255+
tableMeta, tableOptions);
256+
Future<CreateTableResponse> res = this.asyncClient.createTable(request, null);
257+
Utils.waitForFuture(res);
258+
}
259+
260+
@Override
261+
public void deleteDataTable(String tableName) {
262+
DeleteTableRequest request = new DeleteTableRequest(tableName);
263+
Future<DeleteTableResponse> res = this.asyncClient.deleteTable(request, null);
264+
Utils.waitForFuture(res);
265+
}
266+
267+
@Override
268+
public synchronized TimestreamDataTable dataTable(String tableName) {
269+
tryInitMetaWriter();
270+
TimestreamDataTable dataTable = dataTableMap.get(tableName);
271+
if (dataTable == null) {
272+
if (dataTableMap.size() >= config.getMaxDataTableNumForWrite()) {
273+
throw new ClientException("Number of data table for writen in db cannot be larger than " + config.getMaxDataTableNumForWrite());
274+
}
275+
DefaultTableStoreWriter writer = new DefaultTableStoreWriter(
276+
asyncClient,
277+
tableName,
278+
writerConfig,
279+
callback,
280+
executor
281+
);
282+
dataTable = new TimestreamDataTable(
283+
asyncClient,
284+
tableName,
285+
metaTableName,
286+
indexName,
287+
writer,
288+
metaCacheManager
289+
);
290+
dataTableMap.put(tableName, dataTable);
291+
}
292+
return dataTable;
293+
}
294+
295+
@Override
296+
public TimestreamMetaTable metaTable() {
297+
tryInitMetaWriter();
298+
TimestreamMetaTable metaTable = new TimestreamMetaTable(
299+
asyncClient,
300+
metaTableName,
301+
indexName,
302+
metaCacheManager
303+
);
304+
return metaTable;
305+
}
306+
}

0 commit comments

Comments
 (0)