diff --git a/pom.xml b/pom.xml
index c583dcd2..7032308a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -54,7 +54,7 @@
${project.encoding}
UTF-8
1.7.21
-
1.4.0
+ 1.4.3.1-SNAPSHOT
@@ -125,10 +125,70 @@
objenesis
3.0.1
+
+ com.google.code.findbugs
+ jsr305
+ 3.0.2
+
+
+ com.taobao.remoting
+ network.core
+ 1.2.5
+
+
+ com.alibaba
+ fastjson
+ 1.2.68.noneautotype
+
+
+ com.google.code.gson
+ gson
+ 2.8.6
+
+
+ com.google.errorprone
+ error_prone_annotations
+ 2.3.4
+
+
+ com.alipay.sofa
+ bolt
+ 1.6.5
+
+
+ commons-httpclient
+ commons-httpclient
+ 3.1
+
+
+ com.alibaba.toolkit.common
+ toolkit-common-lang
+ 1.1.5
+
+
+ commons-io
+ commons-io
+ 2.5
+
+
+ io.grpc
+ grpc-netty-shaded
+ 1.34.1
+
+
+ com.alipay.common
+ tracer
+ 1.0.39
+
+
+ mysql
+ mysql-connector-java
+ 5.1.40
+
org.apache.hadoop
hadoop-common
@@ -184,6 +244,12 @@
com.oceanbase
obkv-table-client
${table.client.version}
+
+
+ mysql-connector-java
+ mysql
+
+
org.apache.hbase
@@ -272,6 +338,14 @@
Sonatype Snapshot Repository
https://s01.oss.sonatype.org/content/repositories/snapshots/
+
+
+ true
+
+ ant-artifactory
+ Alipay Artifactory
+ http://mvn.test.alipay.net/artifactory/content/groups/public/
+
diff --git a/src/main/java/com/alipay/oceanbase/hbase/OHTable.java b/src/main/java/com/alipay/oceanbase/hbase/OHTable.java
index f0ae53e6..86681cdf 100644
--- a/src/main/java/com/alipay/oceanbase/hbase/OHTable.java
+++ b/src/main/java/com/alipay/oceanbase/hbase/OHTable.java
@@ -25,6 +25,9 @@
import com.alipay.oceanbase.hbase.util.*;
import com.alipay.oceanbase.rpc.ObGlobal;
import com.alipay.oceanbase.rpc.ObTableClient;
+import com.alipay.oceanbase.hbase.util.ExecuteAbleManager;
+import com.alipay.oceanbase.rpc.OperationExecuteAble;
+import com.alipay.oceanbase.rpc.exception.ExceptionUtil;
import com.alipay.oceanbase.rpc.exception.ObTableException;
import com.alipay.oceanbase.rpc.location.model.partition.Partition;
import com.alipay.oceanbase.rpc.exception.ObTableUnexpectedException;
@@ -67,6 +70,7 @@
import java.io.ByteArrayOutputStream;
import java.io.IOException;
+import java.nio.charset.StandardCharsets;
import java.util.*;
import java.util.concurrent.*;
@@ -92,6 +96,24 @@ public class OHTable implements HTableInterface {
*/
private final ObTableClient obTableClient;
+
+ /**
+ * === the operation executable for DDS support ===
+ */
+ private OperationExecuteAble executeAble;
+ private List qualifierWithNum = new ArrayList<>();
+
+ private String familyString;
+
+ private String currentTableFamily;
+
+ private boolean hTableQueryOptimize = false;
+ private boolean localHTableQueryOptimize = false;
+ private boolean localGetByBatch = false;
+ /**
+ * === the operation executable for DDS support end ===
+ */
+
/**
* the ohTable name in byte array
*/
@@ -206,14 +228,29 @@ public OHTable(Configuration configuration, String tableName) throws IOException
long keepAliveTime = configuration.getLong(HBASE_HTABLE_THREAD_KEEP_ALIVE_TIME,
DEFAULT_HBASE_HTABLE_THREAD_KEEP_ALIVE_TIME);
this.executePool = createDefaultThreadPoolExecutor(1, maxThreads, keepAliveTime);
- OHConnectionConfiguration ohConnectionConf = new OHConnectionConfiguration(configuration);
- int numRetries = configuration.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
- HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
- this.obTableClient = ObTableClientManager.getOrCreateObTableClient(setUserDefinedNamespace(
- this.tableNameString, ohConnectionConf));
- this.obTableClient.setRpcExecuteTimeout(ohConnectionConf.getRpcTimeout());
- this.obTableClient.setRuntimeRetryTimes(numRetries);
- setOperationTimeout(ohConnectionConf.getOperationTimeout());
+
+ // Check if DDS configuration is present
+ String ddsAppName = configuration.get(HBASE_OCEANBASE_DDS_APP_NAME);
+ String ddsAppDsName = configuration.get(HBASE_OCEANBASE_DDS_APP_DS_NAME);
+ String ddsVersion = configuration.get(HBASE_OCEANBASE_DDS_VERSION);
+
+ if (isNotBlank(ddsAppName) && isNotBlank(ddsAppDsName) && isNotBlank(ddsVersion)) {
+ // Use DDS mode with ExecuteAbleManager
+ this.obTableClient = null; // Not used in DDS mode
+ this.executeAble = ExecuteAbleManager.getOrCreateExecuteAble(configuration);
+ } else {
+ OHConnectionConfiguration ohConnectionConf = new OHConnectionConfiguration(
+ configuration);
+ int numRetries = configuration.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
+ HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
+ this.obTableClient = ObTableClientManager
+ .getOrCreateObTableClient(setUserDefinedNamespace(this.tableNameString,
+ ohConnectionConf));
+ this.obTableClient.setRpcExecuteTimeout(ohConnectionConf.getRpcTimeout());
+ this.obTableClient.setRuntimeRetryTimes(numRetries);
+ setOperationTimeout(ohConnectionConf.getOperationTimeout());
+ this.executeAble = null;
+ }
finishSetUp();
}
@@ -259,15 +296,28 @@ public OHTable(Configuration configuration, final byte[] tableName,
this.tableNameString = Bytes.toString(tableName);
this.executePool = executePool;
this.cleanupPoolOnClose = false;
- OHConnectionConfiguration ohConnectionConf = new OHConnectionConfiguration(configuration);
- int numRetries = configuration.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
- HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
- this.obTableClient = ObTableClientManager.getOrCreateObTableClient(setUserDefinedNamespace(
- this.tableNameString, ohConnectionConf));
- this.obTableClient.setRpcExecuteTimeout(ohConnectionConf.getRpcTimeout());
- this.obTableClient.setRuntimeRetryTimes(numRetries);
- setOperationTimeout(ohConnectionConf.getOperationTimeout());
+ // Check if DDS configuration is present
+ if (configuration == null) {
+ throw new IllegalArgumentException("configuration is null.");
+ }
+ String ddsAppName = configuration.get(HBASE_OCEANBASE_DDS_APP_NAME);
+ String ddsAppDsName = configuration.get(HBASE_OCEANBASE_DDS_APP_DS_NAME);
+ String ddsVersion = configuration.get(HBASE_OCEANBASE_DDS_VERSION);
+ if (isNotBlank(ddsAppName) && isNotBlank(ddsAppDsName) && isNotBlank(ddsVersion)) {
+ this.executeAble = ExecuteAbleManager.getOrCreateExecuteAble(configuration);
+ this.obTableClient = null; // Not used in DDS mode
+ } else {
+ OHConnectionConfiguration ohConnectionConf = new OHConnectionConfiguration(configuration);
+ int numRetries = configuration.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
+ HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
+ this.obTableClient = ObTableClientManager.getOrCreateObTableClient(setUserDefinedNamespace(
+ this.tableNameString, ohConnectionConf));
+ this.obTableClient.setRpcExecuteTimeout(ohConnectionConf.getRpcTimeout());
+ this.obTableClient.setRuntimeRetryTimes(numRetries);
+ setOperationTimeout(ohConnectionConf.getOperationTimeout());
+ this.executeAble = null;
+ }
finishSetUp();
}
@@ -295,6 +345,7 @@ public OHTable(final byte[] tableName, final ObTableClient obTableClient,
this.closeClientOnClose = false;
this.executePool = executePool;
this.obTableClient = obTableClient;
+ this.executeAble = null;
this.configuration = new Configuration();
finishSetUp();
}
@@ -338,7 +389,8 @@ public OHTable(TableName tableName, Connection connection,
this.obTableClient.setRpcExecuteTimeout(rpcTimeout);
this.obTableClient.setRuntimeRetryTimes(numRetries);
setOperationTimeout(operationTimeout);
-
+ this.executeAble = this.obTableClient;
+
finishSetUp();
}
@@ -493,6 +545,52 @@ public Boolean[] exists(List gets) throws IOException {
return objectResults;
}
+ /**
+ * 将batch请求下所有请求的放在familyMap中。其中key为family,value是一个Pair类型:包含操作的索引值,以及操作内容
+ * @param index 请求的索引值
+ * @param innerFamilyMap 请求的一个pair容器
+ * @param familyMap batch请求的容器
+ */
+ private void geneInnerFamilyMap(int index, Map> innerFamilyMap,
+ Map, List>> familyMap) {
+
+ for (Map.Entry> entry : innerFamilyMap.entrySet()) {
+ String family = Bytes.toString(entry.getKey());
+ Pair, List> keyValueWithIndex = familyMap.get(family);
+ if (keyValueWithIndex == null) {
+ keyValueWithIndex = new Pair, List>(
+ new ArrayList(), new ArrayList());
+ familyMap.put(family, keyValueWithIndex);
+ }
+ keyValueWithIndex.getFirst().add(index);
+ keyValueWithIndex.getSecond().addAll(entry.getValue());
+ }
+ }
+ /**
+ * 将batch请求下的get请求放在getFamilyMap中
+ * @param index
+ * @param get
+ * @param getFamilyMap
+ */
+ private void geneGetInnerFamilyMap(int index, Get get, Map, List>> getFamilyMap) {
+ int count = 0;
+ for (Map.Entry> entry : get.getFamilyMap().entrySet()) {
+ count++;
+ if (count > 1) {
+ throw new FeatureNotSupportedException("do not support multi family");
+ }
+ String family = Bytes.toString(entry.getKey());
+ Pair, List> getsWithIndex = getFamilyMap.get(family);
+ if (getsWithIndex == null) {
+ getsWithIndex = new Pair, List>(new ArrayList(), new ArrayList());
+ getFamilyMap.put(family, getsWithIndex);
+ }
+ getsWithIndex.getFirst().add(index);
+ getsWithIndex.getSecond().add(get);
+ }
+ }
+
+
private BatchOperation compatOldServerPut(final List extends Row> actions, final Object[] results, BatchError batchError, int i, List puts)
throws Exception {
BatchOperationResult tmpResults;
@@ -646,14 +744,19 @@ public void batch(final List extends Row> actions, final Object[] results) thr
}
}
BatchError batchError = new BatchError();
- obTableClient.setRuntimeBatchExecutor(executePool);
+ if (obTableClient != null) {
+ obTableClient.setRuntimeBatchExecutor(executePool);
+ }
+ OperationExecuteAble executeAble = obTableClient == null ? this.executeAble : obTableClient;
List resultMapSingleOp = new LinkedList<>();
- if (!ObGlobal.isHBaseBatchSupport()) {
+ if (executeAble instanceof ObTableClient && !ObGlobal.isHBaseBatchSupport()) {
try {
compatOldServerBatch(actions, results, batchError);
} catch (Exception e) {
throw new IOException(e);
}
+ } else if (obTableClient == null) {
+ batchForDDS(actions);
} else {
String realTableName = getTargetTableName(actions);
BatchOperation batch = buildBatchOperation(realTableName, actions,
@@ -701,6 +804,91 @@ public void batch(final List extends Row> actions, final Object[] results) thr
}
}
+ /**
+ * Check if current table is in DDS mode
+ * @return true if in DDS mode (obTableClient is null and executeAble is not null)
+ */
+ private boolean isDDSMode() {
+ return obTableClient == null && executeAble != null;
+ }
+
+ /**
+ * 当batch请求中至少包含写入/删除/get中的2种类型的请求时,说明时混合的batch请求,返回true
+ * @param write 写入请求的数量
+ * @param delete 删除请求的数量
+ * @param get get请求的数量
+ * @return
+ */
+ private boolean isMix(int write, int delete, int get) {
+ int typeCount = (write > 0 ? 1 : 0) + (delete > 0 ? 1 : 0) + (get > 0 ? 1 : 0);
+ return typeCount >= 2;
+ }
+
+ /**
+ * 批量的batch请求,目前支持put和delete以及get。 不支持复杂的get请求,即设置maxVersion、filter等限制,只可以获取最新的版本
+ * @param actions
+ * @return
+ * @throws IOException
+ */
+ public Object[] batchForDDS(List extends Row> actions) throws IOException {
+ Map, List>> familyMap = new HashMap<>();
+ Map, List>> getsFamilyMap = new HashMap<>();
+ ArrayList writeBuffer = new ArrayList<>();
+ ArrayList deleteBuffer = new ArrayList<>();
+ ArrayList getBuffer = new ArrayList<>();
+ int idx = -1;
+ for (Row action : actions) {
+ idx++;
+ if (action instanceof Put) {
+ Put aPut = (Put) action;
+ validatePut(aPut);
+ checkFamilyViolation(aPut.getFamilyMap().keySet());
+ writeBuffer.add((Put) action);
+ currentWriteBufferSize += aPut.heapSize();
+ geneInnerFamilyMap(idx, aPut.getFamilyMap(), familyMap);
+ } else if (action instanceof Delete) {
+ Delete aDelete = (Delete) action;
+ checkFamilyViolation(aDelete.getFamilyMap().keySet());
+ deleteBuffer.add((Delete) action);
+ geneInnerFamilyMap(idx, aDelete.getFamilyMap(), familyMap);
+ } else if (action instanceof Get) {
+ Get aGet = (Get) action;
+ checkFamilyViolation(aGet.getFamilyMap().keySet());
+ getBuffer.add((Get) action);
+ geneGetInnerFamilyMap(idx, aGet, getsFamilyMap);
+ } else {
+ throw new FeatureNotSupportedException("only support put, delete or get in a batch");
+ }
+ }
+
+ boolean[] resultSuccess = new boolean[actions.size()];
+ Object[] getsResults = new Object[actions.size()];
+
+ // 先执行gets,再执行mutate,否则可能get的结果不一致
+ for (Map.Entry, List>> entry : getsFamilyMap.entrySet()) {
+ Result[] tmpRes = get(entry.getValue().getSecond());
+ int pos = 0;
+ for (Integer index : entry.getValue().getFirst()) {
+ resultSuccess[index] = true;
+ getsResults[index] = tmpRes[pos];
+ pos++;
+ }
+ }
+ boolean isMix = isMix(writeBuffer.size(), deleteBuffer.size(), 0);
+ Object[] results = familyMapBatch(familyMap, resultSuccess, isMix);
+
+ // merge result
+ for (int i = 0; i < getsResults.length; i++) {
+ if (getsResults[i] != null) {
+ if (results[i] != null) {
+ throw new IOException("unexpected batch result, table: " + tableNameString + ", result: " + results[i]);
+ }
+ results[i] = getsResults[i];
+ }
+ }
+ return results;
+ }
+
private List generateGetResult(ObTableSingleOpResult getResult) throws IOException {
List cells = new ArrayList<>();
ObTableSingleOpEntity singleOpEntity = getResult.getEntity();
@@ -875,6 +1063,9 @@ private void processColumnFilters(NavigableSet columnFilters,
@Override
public Result get(final Get get) throws IOException {
+ if (isDDSMode()) {
+ return getForDDS(get);
+ }
if (get.getFamilyMap().keySet() == null || get.getFamilyMap().keySet().isEmpty()) {
// check nothing, use table group;
} else {
@@ -949,7 +1140,13 @@ public Result call() throws IOException {
@Override
public Result[] get(List gets) throws IOException {
Result[] results = new Result[gets.size()];
- if (ObGlobal.isHBaseBatchGetSupport()) { // get only supported in BatchSupport version
+
+ // In DDS mode, avoid potential recursion by using individual get operations
+ if (isDDSMode()) {
+ for (int i = 0; i < gets.size(); i++) {
+ results[i] = get(gets.get(i));
+ }
+ } else if (ObGlobal.isHBaseBatchGetSupport()) { // get only supported in BatchSupport version
batch(gets, results);
} else {
List> futures = new LinkedList<>();
@@ -969,6 +1166,76 @@ public Result[] get(List gets) throws IOException {
return results;
}
+ public Result getForDDS(final Get get) throws IOException {
+ checkFamilyViolation(get.getFamilyMap().keySet());
+ ServerCallable serverCallable = getRequestCallable(get);
+ return executeServerCallable(serverCallable);
+ }
+
+ private ServerCallable getRequestCallable(Get get) {
+ return new ServerCallable(configuration,
+ executeAble,
+ tableNameString,
+ get.getRow(),
+ get.getRow(),
+ operationTimeout) {
+ public Result call() throws IOException {
+ List keyValueList = new ArrayList();
+ for (Map.Entry> entry : get.getFamilyMap().entrySet()) {
+ byte[] family = entry.getKey();
+ String targetTableName = getTargetTableName(tableNameString, Bytes.toString(family), configuration);
+
+ try {
+ long t1 = System.currentTimeMillis();
+ ObTableQuery obTableQuery;
+ ObHTableFilter filter = buildObHTableFilter(get.getFilter(),
+ get.getTimeRange(), get.getMaxVersions(), entry.getValue());
+
+ if (queryOptimized(entry.getValue())) {
+ obTableQuery = buildObTableQuery(filter, get.getRow(), get.getRow(),
+ get.getTimeRange(), -1, entry.getValue());
+ } else {
+ obTableQuery = buildObTableQuery(filter, get.getRow(), true,
+ get.getRow(), true, -1);
+ }
+
+ ObTableQueryRequest request = buildObTableQueryRequest(obTableQuery, targetTableName);
+ OperationExecuteAble executeAble = obTableClient != null ? this.obTableClient : this.executeAble;
+ ObTableClientQueryStreamResult clientQueryStreamResult = (ObTableClientQueryStreamResult) executeAble.execute(request);
+ byte[] newK = new byte[0];
+ byte[] newQ = new byte[0];
+ while (clientQueryStreamResult.next()) {
+ List row = clientQueryStreamResult.getRow();
+ byte[] k = (byte[]) row.get(0).getValue();
+ byte[] q = (byte[]) row.get(1).getValue();
+ long t = (Long) row.get(2).getValue();
+ byte[] v = (byte[]) row.get(3).getValue();
+ if (k == null && q == null) {
+ k = newK;
+ q = newQ;
+ } else {
+ newK = k;
+ newQ = q;
+ }
+ keyValueList.add(new KeyValue(k, family, q, t, v));
+ }
+
+ TableHBaseLoggerFactory.MONITOR.info("get cost: {}ms, table: {}, optimize: {}, cells: {}",
+ (System.currentTimeMillis() - t1),
+ getTargetTableName(tableNameString, Bytes.toString(family), configuration),
+ queryOptimized(), keyValueList.size());
+ } catch (Exception e) {
+ TableHBaseLoggerFactory.logger.error(LCD.convert("01-00002"), tableNameString,
+ Bytes.toString(family), e);
+ throw new IOException("query table:" + tableNameString + " family "
+ + Bytes.toString(family) + " error.", e);
+ }
+ }
+ return new Result(keyValueList);
+ }
+ };
+ }
+
/**
* hbase的获取这个rowkey,没有的话就前一个rowkey,不支持
* @param row row
@@ -982,6 +1249,10 @@ public Result getRowOrBefore(byte[] row, byte[] family) {
@Override
public ResultScanner getScanner(final Scan scan) throws IOException {
+ if (obTableClient == null) {
+ return getScannerForDDS(scan);
+ }
+
if (scan.getFamilyMap().keySet().isEmpty()) {
// check nothing, use table group;
} else {
@@ -1162,6 +1433,129 @@ public ResultScanner getScanner(byte[] family, byte[] qualifier) throws IOExcept
return getScanner(scan);
}
+ public boolean queryOptimized() {
+ return this.localHTableQueryOptimize || this.hTableQueryOptimize;
+ }
+
+ public boolean queryOptimized(Set qualifiers) {
+ boolean ret = false;
+ if (queryOptimized() && qualifiers != null && !qualifiers.isEmpty()) {
+ for (byte[] aQualifier : qualifiers) {
+ if (aQualifier != null && aQualifier.length > 0) {
+ ret = true;
+ } else {
+ return false;
+ }
+ }
+ }
+ return ret;
+ }
+
+ public ResultScanner getScannerForDDS(final Scan scan) throws IOException {
+ checkFamilyViolation(scan.getFamilyMap().keySet());
+ //be careful about the packet size, if the packet exceed the max result size, leading to error
+ ServerCallable serverCallable = new ServerCallable(
+ configuration, executeAble, tableNameString, scan.getStartRow(), scan.getStopRow(),
+ operationTimeout) {
+ public ResultScanner call() throws Exception {
+ if (scan.getFamilyMap().keySet().isEmpty() || scan.getFamilyMap().size() > 1) {
+ throw new FeatureNotSupportedException("scan family map is not supported yet");
+ } else {
+ for (Map.Entry> entry : scan.getFamilyMap().entrySet()) {
+ byte[] f = entry.getKey();
+ try {
+ ObHTableFilter filter = buildObHTableFilter(scan.getFilter(),
+ scan.getTimeRange(), scan.getMaxVersions(), entry.getValue());
+ ObTableQuery obTableQuery;
+ if (Arrays.equals(scan.getStartRow(), HConstants.EMPTY_START_ROW)
+ && Arrays.equals(scan.getStopRow(), HConstants.EMPTY_START_ROW)) {
+ obTableQuery = buildObTableQuery(filter, (ObNewRange)null, scan.getBatch());
+ } else if (!scan.isReversed() && queryOptimized(entry.getValue())) {
+ obTableQuery = buildObTableQuery(filter, scan.getStartRow(), true,
+ scan.getStopRow(), false, scan.getBatch(), entry.getValue());
+ } else if (scan.isReversed()) {
+ // 由于 HBase 接口与 OB 接口表达范围的差异, reverse scan 需要交换 startRow 和 stopRow
+ obTableQuery = buildObTableQuery(filter, scan.getStopRow(), false,
+ scan.getStartRow(), true, scan.getBatch());
+ obTableQuery.setScanOrder(ObScanOrder.Reverse);
+ } else {
+ obTableQuery = buildObTableQuery(filter, scan.getStartRow(), true,
+ scan.getStopRow(), false, scan.getBatch());
+ }
+
+ obTableQuery.setMaxResultSize(scan.getMaxResultSize());
+ long t1 = System.currentTimeMillis();
+ ObTableQueryRequest request = buildObTableQueryRequest(obTableQuery,
+ getTargetTableName(tableNameString, Bytes.toString(f), configuration));
+ ObTableClientQueryStreamResult clientQueryStreamResult = (ObTableClientQueryStreamResult) executeAble
+ .execute(request);
+ TableHBaseLoggerFactory.MONITOR.info("get cost query result: {}ms, table: {}, query optimize: {}",
+ (System.currentTimeMillis() - t1),
+ getTargetTableName(tableNameString, Bytes.toString(f), configuration), queryOptimized());
+ return new ClientStreamScanner(clientQueryStreamResult, tableNameString, f, false);
+ } catch (Exception e) {
+ logger
+ .error(LCD.convert("01-00003"), tableNameString, Bytes.toString(f), e);
+ throw new IOException("scan table:" + tableNameString + " family "
+ + Bytes.toString(f) + " error.", e);
+ }
+ }
+ }
+
+ throw new IOException("scan table:" + tableNameString + "has no family");
+ }
+ };
+ return executeServerCallable(serverCallable);
+ }
+
+ private void checkFamilyViolation(Collection families) {
+ for (byte[] family : families) {
+ if (isBlank(Bytes.toString(family))) {
+ throw new IllegalArgumentException("family is blank");
+ }
+ this.familyString = new String(family, StandardCharsets.UTF_8);
+ this.currentTableFamily = tableNameString.toLowerCase() + CH_SPLITE_TYPE + familyString.toLowerCase();
+ }
+
+ }
+
+ private void innerDeleteForDDS(Delete delete) throws IOException {
+ checkArgument(delete.getRow() != null, "row is null");
+ checkArgument(!delete.isEmpty(), "delete is empty");
+ List errorCodeList = new ArrayList();
+ try {
+ checkFamilyViolation(delete.getFamilyMap().keySet());
+
+ Map.Entry> entry = delete.getFamilyMap().entrySet().iterator()
+ .next();
+ ObTableBatchOperation batch = buildObTableBatchOperation(entry.getValue(), false, null,
+ false);
+
+ ObTableBatchOperationRequest request = buildObTableBatchOperationRequest(batch,
+ getTargetTableName(tableNameString, Bytes.toString(entry.getKey()), configuration));
+ ObTableBatchOperationResult result = (ObTableBatchOperationResult) executeAble
+ .execute(request);
+ boolean hasError = false;
+ int throwErrorCode = 0;
+ for (ObTableOperationResult obTableOperationResult : result.getResults()) {
+ int errorCode = obTableOperationResult.getHeader().getErrno();
+ errorCodeList.add(errorCode);
+ if (errorCode != 0) {
+ hasError = true;
+ throwErrorCode = errorCode;
+ }
+ }
+
+ if (hasError) {
+ throw new ObTableException(throwErrorCode);
+ }
+ } catch (Exception e) {
+ logger.error(LCD.convert("01-00004"), tableNameString, errorCodeList, e);
+ throw new IOException("delete table " + tableNameString + " error codes "
+ + errorCodeList, e);
+ }
+ }
+
@Override
public void put(Put put) throws IOException {
doPut(Collections.singletonList(put));
@@ -1282,13 +1676,23 @@ private void innerDelete(Delete delete) throws IOException {
@Override
public void delete(Delete delete) throws IOException {
checkFamilyViolation(delete.getFamilyMap().keySet(), false);
- innerDelete(delete);
+ if (isDDSMode()) {
+ innerDeleteForDDS(delete);
+ } else {
+ innerDelete(delete);
+ }
}
@Override
public void delete(List deletes) throws IOException {
- for (Delete delete : deletes) {
- innerDelete(delete);
+ if (isDDSMode()) { // DDS mode
+ for (Delete delete : deletes) {
+ innerDeleteForDDS(delete);
+ }
+ } else {
+ for (Delete delete : deletes) {
+ innerDelete(delete);
+ }
}
}
@@ -1366,8 +1770,12 @@ private boolean checkAndMutation(byte[] row, byte[] family, byte[] qualifier, Co
ObTableQueryAndMutateRequest request = buildObTableQueryAndMutateRequest(obTableQuery,
batch, getTargetTableName(tableNameString, Bytes.toString(family), configuration));
- ObTableQueryAndMutateResult result = (ObTableQueryAndMutateResult) obTableClient
- .execute(request);
+ ObTableQueryAndMutateResult result = null;
+ if (!isDDSMode()) {
+ result = (ObTableQueryAndMutateResult) obTableClient.execute(request);
+ } else {
+ result = (ObTableQueryAndMutateResult) executeAble.execute(request);
+ }
return result.getAffectedRows() > 0;
}
@@ -1405,9 +1813,16 @@ public Result append(Append append) throws IOException {
ObTableQueryAndMutateRequest request = buildObTableQueryAndMutateRequest(obTableQuery,
batchOperation,
getTargetTableName(tableNameString, Bytes.toString(f), configuration));
- request.setReturningAffectedEntity(true);
- ObTableQueryAndMutateResult result = (ObTableQueryAndMutateResult) obTableClient
- .execute(request);
+ request.setReturningAffectedEntity(append.isReturnResults());
+ ObTableQueryAndMutateResult result = null;
+ if (obTableClient != null) {
+ result = (ObTableQueryAndMutateResult) obTableClient.execute(request);
+ } else {
+ result = (ObTableQueryAndMutateResult) executeAble.execute(request);
+ }
+ if (!append.isReturnResults()) {
+ return null;
+ }
ObTableQueryResult queryResult = result.getAffectedEntity();
List keyValues = new ArrayList();
for (List row : queryResult.getPropertiesRows()) {
@@ -1466,8 +1881,12 @@ public Result increment(Increment increment) throws IOException {
ObTableQueryAndMutateRequest request = buildObTableQueryAndMutateRequest(obTableQuery,
batch, getTargetTableName(tableNameString, Bytes.toString(f), configuration));
request.setReturningAffectedEntity(true);
- ObTableQueryAndMutateResult result = (ObTableQueryAndMutateResult) obTableClient
- .execute(request);
+ ObTableQueryAndMutateResult result = null;
+ if (obTableClient != null) {
+ result = (ObTableQueryAndMutateResult) obTableClient.execute(request);
+ } else {
+ result = (ObTableQueryAndMutateResult) executeAble.execute(request);
+ }
ObTableQueryResult queryResult = result.getAffectedEntity();
List keyValues = new ArrayList();
for (List row : queryResult.getPropertiesRows()) {
@@ -1516,8 +1935,12 @@ public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, lo
ObTableQueryAndMutateRequest request = buildObTableQueryAndMutateRequest(obTableQuery,
batch, getTargetTableName(tableNameString, Bytes.toString(family), configuration));
request.setReturningAffectedEntity(true);
- ObTableQueryAndMutateResult result = (ObTableQueryAndMutateResult) obTableClient
- .execute(request);
+ ObTableQueryAndMutateResult result = null;
+ if (obTableClient != null) {
+ result = (ObTableQueryAndMutateResult) obTableClient.execute(request);
+ } else {
+ result = (ObTableQueryAndMutateResult) executeAble.execute(request);
+ }
ObTableQueryResult queryResult = result.getAffectedEntity();
if (queryResult.getPropertiesRows().size() != 1) {
throw new IllegalStateException("the increment result size illegal "
@@ -1550,7 +1973,10 @@ public boolean isAutoFlush() {
@Override
public void flushCommits() throws IOException {
-
+ if (isDDSMode()) {
+ flushCommitsForDDS();
+ return;
+ }
try {
if (writeBuffer.isEmpty()){
return;
@@ -1611,6 +2037,45 @@ public void flushCommits() throws IOException {
}
}
}
+
+ private void flushCommitsForDDS() throws IOException {
+ try {
+ boolean[] resultSuccess = new boolean[writeBuffer.size()];
+ try {
+ Map, List>> familyMap = new HashMap, List>>();
+ for (int i = 0; i < writeBuffer.size(); i++) {
+ Put aPut = writeBuffer.get(i);
+ Map> innerFamilyMap = aPut.getFamilyMap();
+ // multi family can not ensure automatic
+ geneInnerFamilyMap(i, innerFamilyMap, familyMap);
+ }
+ familyMapBatch(familyMap, resultSuccess, false);
+
+ } finally {
+ // mutate list so that it is empty for complete success, or contains
+ // only failed records results are returned in the same order as the
+ // requests in list walk the list backwards, so we can remove from list
+ // without impacting the indexes of earlier members
+ for (int i = resultSuccess.length - 1; i >= 0; i--) {
+ if (resultSuccess[i]) {
+ // successful Puts are removed from the list here.
+ writeBuffer.remove(i);
+ }
+ }
+ }
+ } finally {
+ if (clearBufferOnFail) {
+ writeBuffer.clear();
+ currentWriteBufferSize = 0;
+ } else {
+ // the write buffer was adjusted by processBatchOfPuts
+ currentWriteBufferSize = 0;
+ for (Put aPut : writeBuffer) {
+ currentWriteBufferSize += aPut.heapSize();
+ }
+ }
+ }
+ }
@Override
public void close() throws IOException {
@@ -1742,8 +2207,12 @@ public void batchCoprocessorService(Descriptors.MethodDescri
@Override
public void setOperationTimeout(int operationTimeout) {
this.operationTimeout = operationTimeout;
- this.obTableClient.setRuntimeMaxWait(operationTimeout);
- this.obTableClient.setRuntimeBatchMaxWait(operationTimeout);
+ if (this.obTableClient != null) {
+ this.obTableClient.setRuntimeMaxWait(operationTimeout);
+ this.obTableClient.setRuntimeBatchMaxWait(operationTimeout);
+ } else { // TODO: dds set configuration
+
+ }
this.operationExecuteInPool = this.configuration.getBoolean(
HBASE_CLIENT_OPERATION_EXECUTE_IN_POOL,
(this.operationTimeout != HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT));
@@ -1768,7 +2237,75 @@ public int getRpcTimeout() {
}
public void setRuntimeBatchExecutor(ExecutorService runtimeBatchExecutor) {
- this.obTableClient.setRuntimeBatchExecutor(runtimeBatchExecutor);
+ if (this.obTableClient != null) {
+ this.obTableClient.setRuntimeBatchExecutor(runtimeBatchExecutor);
+ }
+ }
+
+ /**
+ * 封装batch请求。返回batch操作的结果集results
+ * @param familyMap 所有的batch请求的容器
+ * @param resultSuccess batch请求的返回值数组
+ * @param isMix 是否为混合请求的batch
+ * @throws IOException
+ */
+ private Object[] familyMapBatch(Map, List>> familyMap,
+ boolean[] resultSuccess, boolean isMix) throws IOException {
+ Object[] batchResults = new Object[resultSuccess.length];
+
+ for (Map.Entry, List>> entry : familyMap.entrySet()) {
+ List errorCodeList = new ArrayList(entry.getValue().getSecond().size());
+ ObTableBatchOperation batch;
+ try {
+ String targetTableName = getTargetTableName(this.tableNameString, entry.getKey(), configuration);
+ batch = buildObTableBatchOperation(entry.getValue().getSecond(), false, null,
+ isMix);
+ ObTableBatchOperationRequest request = buildObTableBatchOperationRequest(batch, targetTableName);
+ ObTableBatchOperationResult result = (ObTableBatchOperationResult) executeAble.execute(request);
+ List resultList = result.getResults();
+ Map throwResults = new HashMap<>();
+
+ for (ObTableOperationResult opResult : resultList) {
+ int errorCode = opResult.getHeader().getErrno();
+ errorCodeList.add(errorCode);
+ if (errorCode != 0) {
+ throwResults.put(entry.getValue().getFirst().get(0), opResult);
+ }
+ }
+
+ for (Map.Entry throwResult : throwResults.entrySet()) {
+ if (throwResult.getValue() != null) {
+ ExceptionUtil.throwObTableException(throwResult.getValue().getExecuteHost(),
+ throwResult.getValue().getExecutePort(), throwResult.getValue().getSequence(),
+ throwResult.getValue().getUniqueId(), throwResult.getValue().getHeader().getErrno(), targetTableName);
+ }
+ }
+
+ for (Integer index : entry.getValue().getFirst()) {
+ resultSuccess[index] = true;
+ List keyValues = new ArrayList<>();
+ keyValues.add(new KeyValue());
+ Result kvRes = new Result(keyValues);
+ batchResults[index] = kvRes;
+ }
+ } catch (Exception e) {
+ if (e instanceof ObTableException) {
+ ObTableException ee = (ObTableException)e;
+ errorCodeList.add(ee.getErrorCode());
+ }
+
+ logger.error(LCD.convert("01-00008"), tableNameString, errorCodeList, autoFlush,
+ resultSuccess.length, e);
+
+ throw new IOException(
+ "batch mutate table " + tableNameString + ", error codes " + errorCodeList
+ + ", auto flush " + autoFlush + ", and current buffer size "
+ + resultSuccess.length
+ + ". batch opertaions failed. may caused by put and delete the same k/q at a batch or "
+ + "don't have any data", e);
+ }
+ }
+ return batchResults;
}
T executeServerCallable(final ServerCallable serverCallable) throws IOException {
@@ -1858,6 +2395,118 @@ private ObHTableFilter buildObHTableFilter(Filter filter, TimeRange timeRange, i
return obHTableFilter;
}
+ private ObTableQuery buildObTableQuery(ObHTableFilter filter, ObNewRange obNewRange,
+ int batchSize) {
+ ObTableQuery obTableQuery = new ObTableQuery();
+ obTableQuery.setIndexName("PRIMARY");
+ obTableQuery.sethTableFilter(filter);
+ for (String column : ALL_COLUMNS) {
+ obTableQuery.addSelectColumn(column);
+ }
+ if (obNewRange != null) {
+ obTableQuery.addKeyRange(obNewRange);
+ }
+ if (batchSize > 0) {
+ obTableQuery.setBatchSize(batchSize);
+ }
+ return obTableQuery;
+ }
+
+ private ObTableQuery buildObTableQuery(ObHTableFilter filter,
+ byte[] start, boolean includeStart,
+ byte[] stop, boolean includeStop,
+ int batchSize, Set qualifiers) {
+ List obRangeList = new ArrayList<>();
+ for (byte[] qualifier : qualifiers) {
+ if (qualifier.length == 0) {
+ throw new ObTableUnexpectedException("qualifier is empty when use htable optimize query");
+ }
+
+ ObNewRange obNewRange = new ObNewRange();
+ if (includeStart) {
+ obNewRange.setStartKey(ObRowKey.getInstance(start, qualifier, ObObj.getMin()));
+ } else {
+ obNewRange.setStartKey(ObRowKey.getInstance(start, qualifier, ObObj.getMax()));
+ }
+
+ if (Arrays.equals(stop, HConstants.EMPTY_START_ROW)) {
+ obNewRange.setEndKey(ObRowKey.getInstance(ObObj.getMax(), qualifier, ObObj.getMax()));
+ } else if (includeStop) {
+ obNewRange.setEndKey(ObRowKey.getInstance(stop, qualifier, ObObj.getMax()));
+ } else {
+ obNewRange.setEndKey(ObRowKey.getInstance(stop, qualifier, ObObj.getMin()));
+ }
+ obRangeList.add(obNewRange);
+ }
+
+ return buildObTableQuery(filter, obRangeList, batchSize);
+ }
+
+ private ObTableQuery buildObTableQuery(ObHTableFilter filter, List obNewRangeList,
+ int batchSize) {
+ ObTableQuery obTableQuery = new ObTableQuery();
+ obTableQuery.setIndexName("PRIMARY");
+ obTableQuery.sethTableFilter(filter);
+ for (String column : ALL_COLUMNS) {
+ obTableQuery.addSelectColumn(column);
+ }
+ if (obNewRangeList.size() > 0) {
+ for (ObNewRange obNewRange : obNewRangeList) {
+ obTableQuery.addKeyRange(obNewRange);
+ }
+ }
+ if (batchSize > 0) {
+ obTableQuery.setBatchSize(batchSize);
+ }
+ return obTableQuery;
+ }
+ private ObTableQuery buildObTableQuery(ObHTableFilter filter,
+ byte[] start, byte[] stop, TimeRange tr,
+ int batchSize, Set qualifiers) {
+ List obRangeList = new ArrayList<>();
+ for (byte[] qualifier : qualifiers) {
+ if (qualifier.length == 0) {
+ throw new ObTableUnexpectedException("qualifier is empty when use htable optimize query");
+ }
+
+ ObNewRange obNewRange = new ObNewRange();
+ obNewRange.setStartKey(ObRowKey.getInstance(start, qualifier, -tr.getMax()));
+ if (Arrays.equals(stop, HConstants.EMPTY_START_ROW)) {
+ obNewRange.setEndKey(ObRowKey.getInstance(ObObj.getMax(), qualifier, -tr.getMin()));
+ } else {
+ obNewRange.setEndKey(ObRowKey.getInstance(stop, qualifier, -tr.getMin()));
+ }
+ obRangeList.add(obNewRange);
+ }
+
+ return buildObTableQuery(filter, obRangeList, batchSize);
+ }
+
+ private ObTableQuery buildObTableQuery(ObHTableFilter filter, byte[] start,
+ boolean includeStart, byte[] stop, boolean includeStop,
+ int batchSize) {
+ ObNewRange obNewRange = new ObNewRange();
+
+ if (includeStart) {
+ obNewRange.setStartKey(ObRowKey.getInstance(start, ObObj.getMin(), ObObj.getMin()));
+ } else {
+ obNewRange.setStartKey(ObRowKey.getInstance(start, ObObj.getMax(), ObObj.getMax()));
+ }
+
+ if (Arrays.equals(stop, HConstants.EMPTY_START_ROW)) {
+ obNewRange.setEndKey(ObRowKey.getInstance(ObObj.getMax(), ObObj.getMax(),
+ ObObj.getMax()));
+ } else if (includeStop) {
+ obNewRange.setEndKey(ObRowKey.getInstance(stop, ObObj.getMax(), ObObj.getMax()));
+ } else {
+ obNewRange.setEndKey(ObRowKey.getInstance(stop, ObObj.getMin(), ObObj.getMin()));
+ }
+
+ return buildObTableQuery(filter, obNewRange, batchSize);
+ }
+
+
+
private byte[] buildCheckAndMutateFilterString(byte[] family, byte[] qualifier,
CompareFilter.CompareOp compareOp, byte[] value)
throws IOException {
@@ -2009,6 +2658,26 @@ public static ObTableBatchOperation buildObTableBatchOperation(List ke
return batch;
}
+ private ObTableBatchOperation buildObTableBatchOperation(List keyValueList,
+ boolean putToAppend,
+ List qualifiers,
+ boolean isMix) {
+ ObTableBatchOperation batch = new ObTableBatchOperation();
+ for (KeyValue kv : keyValueList) {
+ if (qualifiers != null) {
+ qualifiers.add(kv.getQualifier());
+ }
+ batch.addTableOperation(buildObTableOperation(kv, putToAppend));
+ }
+ // batch with the same type
+ batch.setSameType(!isMix);
+ batch.setSamePropertiesNames(true);
+ return batch;
+ }
+
+
+
+
private com.alipay.oceanbase.rpc.mutation.Mutation buildMutation(KeyValue kv,
ObTableOperationType operationType,
boolean isTableGroup) {
diff --git a/src/main/java/com/alipay/oceanbase/hbase/OHTablePool.java b/src/main/java/com/alipay/oceanbase/hbase/OHTablePool.java
index f0d4a447..f2c5e5ac 100644
--- a/src/main/java/com/alipay/oceanbase/hbase/OHTablePool.java
+++ b/src/main/java/com/alipay/oceanbase/hbase/OHTablePool.java
@@ -19,6 +19,7 @@
import com.alipay.oceanbase.hbase.constants.OHConstants;
import com.alipay.oceanbase.hbase.exception.FeatureNotSupportedException;
+import com.alipay.oceanbase.hbase.util.ExecuteAbleManager;
import com.alipay.oceanbase.hbase.util.KeyDefiner;
import com.alipay.oceanbase.hbase.util.OHTableFactory;
import com.google.protobuf.Descriptors;
@@ -39,12 +40,14 @@
import java.io.Closeable;
import java.io.IOException;
import java.util.Collection;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import static com.alipay.oceanbase.hbase.constants.OHConstants.*;
+import static org.apache.commons.lang.StringUtils.isNotBlank;
import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT;
public class OHTablePool implements Closeable {
@@ -156,6 +159,43 @@ public OHTablePool(final Configuration config, final int maxSize,
this.tables = new PoolMap<>(this.poolType, this.maxSize);
}
+ public void init() throws Exception {
+ if (tableAttributes != null) {
+ Map> tableAttributeMap = new HashMap>();
+ for (Map.Entry entry : tableAttributes.entrySet()) {
+ String key = entry.getKey();
+ byte[] value = entry.getValue();
+ String[] parsedKey = KeyDefiner.parsePoolOHTableAttributeName(key);
+ if (parsedKey != null) {
+ String tableName = parsedKey[0];
+ String attributeName = parsedKey[1];
+
+ Map attributeMap = tableAttributeMap.get(tableName);
+
+ if (attributeMap == null) {
+ attributeMap = new HashMap();
+ tableAttributeMap.put(tableName, attributeMap);
+ }
+
+ attributeMap.put(attributeName, Bytes.toString(value));
+ }
+ }
+
+ for (Map.Entry> entry : tableAttributeMap.entrySet()) {
+ Map attributeMap = entry.getValue();
+ String paramUrl = attributeMap.get(HBASE_OCEANBASE_PARAM_URL);
+ String fullUserName = attributeMap.get(HBASE_OCEANBASE_FULL_USER_NAME);
+ String password = attributeMap.get(HBASE_OCEANBASE_PASSWORD);
+ ExecuteAbleManager.getOrCreateObTableClient(config, paramUrl, fullUserName,
+ password);
+ }
+ }
+
+ if (isNotBlank(config.get(HBASE_OCEANBASE_DDS_APP_NAME))) {
+ ExecuteAbleManager.getOrCreateDdsObTableClient(config);
+ }
+ }
+
/**
* Get a reference to the specified table from the pool.
*
diff --git a/src/main/java/com/alipay/oceanbase/hbase/constants/OHConstants.java b/src/main/java/com/alipay/oceanbase/hbase/constants/OHConstants.java
index e10ef0b1..4f5c2ea8 100644
--- a/src/main/java/com/alipay/oceanbase/hbase/constants/OHConstants.java
+++ b/src/main/java/com/alipay/oceanbase/hbase/constants/OHConstants.java
@@ -161,6 +161,20 @@ public final class OHConstants {
public static final String SOCKET_TIMEOUT = "ipc.socket.timeout";
- public static final int DEFAULT_SOCKET_TIMEOUT = 20000; // 20 seconds
+public static final int DEFAULT_SOCKET_TIMEOUT = 20000; // 20 seconds
+
+
+ /**
+ * DDS (Data Distribution Service) related constants
+ */
+ public static final String HBASE_OCEANBASE_DDS_APP_NAME = "hbase.oceanbase.dds.app.name";
+ public static final String HBASE_OCEANBASE_DDS_APP_DS_NAME = "hbase.oceanbase.dds.app.ds.name";
+ public static final String HBASE_OCEANBASE_DDS_VERSION = "hbase.oceanbase.dds.version";
+ public static final String HBASE_HTABLE_CLIENT_WRITE_BUFFER = "hbase.client.write.buffer";
+
+ public static final long DEFAULT_HBASE_HTABLE_CLIENT_WRITE_BUFFER = 2097152;
+
+ public static final char CH_SINGLE_QUOTA = '\'';
+ public static final char CH_SPLITE_TYPE = '|';
}
diff --git a/src/main/java/com/alipay/oceanbase/hbase/execute/ServerCallable.java b/src/main/java/com/alipay/oceanbase/hbase/execute/ServerCallable.java
index 805503cf..4b3c125f 100644
--- a/src/main/java/com/alipay/oceanbase/hbase/execute/ServerCallable.java
+++ b/src/main/java/com/alipay/oceanbase/hbase/execute/ServerCallable.java
@@ -18,6 +18,7 @@
package com.alipay.oceanbase.hbase.execute;
import com.alipay.oceanbase.rpc.ObTableClient;
+import com.alipay.oceanbase.rpc.OperationExecuteAble;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.ConnectionUtils;
@@ -34,6 +35,7 @@ public abstract class ServerCallable implements Callable {
protected final Configuration conf;
protected final ObTableClient obTableClient;
+ protected final OperationExecuteAble executeAble;
protected final String tableNameString;
protected int callTimeout;
protected long globalStartTime, endTime;
@@ -52,6 +54,27 @@ public ServerCallable(Configuration conf, ObTableClient obTableClient, String ta
byte[] startRow, byte[] endRow, int callTimeout) {
this.conf = conf;
this.obTableClient = obTableClient;
+ this.executeAble = null;
+ this.tableNameString = tableNameString;
+ this.callTimeout = callTimeout;
+ this.startRow = startRow;
+ this.endRow = endRow;
+ }
+
+ /**
+ * ServerCallable
+ * @param conf the conf to use
+ * @param executeAble executeAble to use
+ * @param tableNameString Table name to which tableNameString belongs.
+ * @param startRow start row
+ * @param endRow end row
+ * @param callTimeout timeout
+ */
+ public ServerCallable(Configuration conf, OperationExecuteAble executeAble, String tableNameString,
+ byte[] startRow, byte[] endRow, int callTimeout) {
+ this.conf = conf;
+ this.obTableClient = null;
+ this.executeAble = executeAble;
this.tableNameString = tableNameString;
this.callTimeout = callTimeout;
this.startRow = startRow;
diff --git a/src/main/java/com/alipay/oceanbase/hbase/qualifiertype/OHTableMetaInitializer.java b/src/main/java/com/alipay/oceanbase/hbase/qualifiertype/OHTableMetaInitializer.java
new file mode 100644
index 00000000..4c2febab
--- /dev/null
+++ b/src/main/java/com/alipay/oceanbase/hbase/qualifiertype/OHTableMetaInitializer.java
@@ -0,0 +1,46 @@
+/*-
+ * #%L
+ * OceanBase Table Hbase Framework
+ * %%
+ * Copyright (C) 2016 - 2018 Ant Financial Services Group
+ * %%
+ * OBKV HBase Client Framework is licensed under Mulan PSL v2.
+ * You can use this software according to the terms and conditions of the Mulan PSL v2.
+ * You may obtain a copy of Mulan PSL v2 at:
+ * http://license.coscl.org.cn/MulanPSL2
+ * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
+ * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
+ * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
+ * See the Mulan PSL v2 for more details.
+ * #L%
+ */
+
+package com.alipay.oceanbase.hbase.qualifiertype;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * @author qiaoyunyao
+ * @version 1: OHTableMetaInitializer.java, v 0.1 2023年07月14日 16:59 qiaoyunyao Exp $
+ */
+public class OHTableMetaInitializer {
+
+ private static ConcurrentHashMap> tableFamilyTypeMap = new ConcurrentHashMap<>();
+
+ // 获取用户qualifier的类型信息,缓存到qualifierTypeMap中。新插入的k-v会覆盖原本的v
+ public static void setQualifierType(String tableName, String family, Map qualifierTypeMaps) {
+ String tableFamily = tableName + "|" + family;
+ tableFamilyTypeMap.put(tableFamily, qualifierTypeMaps);
+
+ }
+
+ public static ConcurrentHashMap> getTableFamilyTypeMap() {
+ return tableFamilyTypeMap;
+ }
+
+ public static void clearQualifierMap(){
+ tableFamilyTypeMap.clear();
+ }
+
+}
diff --git a/src/main/java/com/alipay/oceanbase/hbase/qualifiertype/ObQualifierType.java b/src/main/java/com/alipay/oceanbase/hbase/qualifiertype/ObQualifierType.java
new file mode 100644
index 00000000..3b1b8cdc
--- /dev/null
+++ b/src/main/java/com/alipay/oceanbase/hbase/qualifiertype/ObQualifierType.java
@@ -0,0 +1,46 @@
+/*-
+ * #%L
+ * OceanBase Table Hbase Framework
+ * %%
+ * Copyright (C) 2016 - 2018 Ant Financial Services Group
+ * %%
+ * OBKV HBase Client Framework is licensed under Mulan PSL v2.
+ * You can use this software according to the terms and conditions of the Mulan PSL v2.
+ * You may obtain a copy of Mulan PSL v2 at:
+ * http://license.coscl.org.cn/MulanPSL2
+ * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
+ * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
+ * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
+ * See the Mulan PSL v2 for more details.
+ * #L%
+ */
+
+package com.alipay.oceanbase.hbase.qualifiertype;
+
+/**
+ * @author qiaoyunyao
+ * @version 1: ObQualifierType.java, v 0.1 2023年07月12日 18:16 qiaoyunyao Exp $
+ */
+public enum ObQualifierType {
+ INVALID_TYPE(0), Long(1), Int(2), Double(3);
+
+ private final int code;
+
+ ObQualifierType(int code) {
+ this.code = code;
+ }
+
+ @Override
+ public String toString() {
+ switch (code) {
+ case 1:
+ return "Long";
+ case 2:
+ return "Int";
+ case 3:
+ return "Double";
+ default:
+ return "INVALID_TYPE";
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/main/java/com/alipay/oceanbase/hbase/util/ExecuteAbleManager.java b/src/main/java/com/alipay/oceanbase/hbase/util/ExecuteAbleManager.java
new file mode 100644
index 00000000..20b6991f
--- /dev/null
+++ b/src/main/java/com/alipay/oceanbase/hbase/util/ExecuteAbleManager.java
@@ -0,0 +1,356 @@
+/*-
+ * #%L
+ * OceanBase Table Hbase Framework
+ * %%
+ * Copyright (C) 2016 - 2018 Ant Financial Services Group
+ * %%
+ * OBKV HBase Client Framework is licensed under Mulan PSL v2.
+ * You can use this software according to the terms and conditions of the Mulan PSL v2.
+ * You may obtain a copy of Mulan PSL v2 at:
+ * http://license.coscl.org.cn/MulanPSL2
+ * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
+ * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
+ * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
+ * See the Mulan PSL v2 for more details.
+ * #L%
+ */
+
+package com.alipay.oceanbase.hbase.util;
+
+import com.alipay.oceanbase.rpc.ObTableClient;
+import com.alipay.oceanbase.rpc.OperationExecuteAble;
+import com.alipay.oceanbase.rpc.constant.Constants;
+import com.alipay.oceanbase.rpc.property.Property;
+import com.alipay.oceanbase.rpc.dds.DdsObTableClient;
+import com.google.common.base.Objects;
+import org.apache.hadoop.conf.Configuration;
+import org.slf4j.Logger;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.ReentrantLock;
+
+import static com.alipay.oceanbase.hbase.constants.OHConstants.*;
+import static com.alipay.oceanbase.hbase.util.Preconditions.checkArgument;
+import static com.alipay.oceanbase.hbase.util.TableHBaseLoggerFactory.LCD;
+import static org.apache.commons.lang.StringUtils.isNotBlank;
+
+/**
+ * @author zhiqi.zzq
+ * @since 2018/7/24 下午9:23
+ */
+public class ExecuteAbleManager {
+ private static final Logger logger = TableHBaseLoggerFactory
+ .getLogger("ExecuteAbleManager");
+
+ public static final ConcurrentHashMap OB_TABLE_CLIENT_LOCK = new ConcurrentHashMap();
+ public static final Map OB_TABLE_CLIENT_INSTANCE = new ConcurrentHashMap();
+
+ public static final ConcurrentHashMap DDS_OB_TABLE_CLIENT_LOCK = new ConcurrentHashMap();
+ public static final Map DDS_OB_TABLE_CLIENT_INSTANCE = new ConcurrentHashMap();
+
+ /**
+ * Get or create the execution handler.
+ *
+ * Generally , we should not mix the table-specific paramURL mode with the appDataSource mode.
+ * If you do mix the methods you should be clearly aware of that you will get the sharding client when
+ * the specific table is not declared。
+ *
+ * @param conf the config
+ */
+ public static OperationExecuteAble getOrCreateExecuteAble(Configuration conf)
+ throws IllegalArgumentException,
+ IOException {
+ // 1. try to get ObTableClient according to the table-specific paramURL
+ String paramUrl = conf.get(HBASE_OCEANBASE_PARAM_URL);
+ String fullUserName = conf.get(HBASE_OCEANBASE_FULL_USER_NAME);
+ if (isNotBlank(paramUrl) || isNotBlank(fullUserName)) {
+ String password = conf.get(HBASE_OCEANBASE_PASSWORD, Constants.EMPTY_STRING);
+ return getOrCreateObTableClient(conf, paramUrl, fullUserName, password);
+ }
+
+ // 2. otherwise, fall into the DdsObTableClient branch
+ return getOrCreateDdsObTableClient(conf);
+ }
+
+ /**
+ * Get or create ObTableClient according to the obTableClientKey.
+ *
+ * @param obTableClientKey
+ * @return
+ * @throws IOException
+ */
+ public static ObTableClient getOrCreateObTableClient(ObTableClientKey obTableClientKey)
+ throws IOException {
+ if (OB_TABLE_CLIENT_INSTANCE.get(obTableClientKey) == null) {
+ ReentrantLock tmp = new ReentrantLock();
+ ReentrantLock lock = OB_TABLE_CLIENT_LOCK.putIfAbsent(obTableClientKey, tmp);
+ lock = lock == null ? tmp : lock;
+ lock.lock();
+ try {
+ if (OB_TABLE_CLIENT_INSTANCE.get(obTableClientKey) == null) {
+ ObTableClient obTableClient = new ObTableClient();
+ obTableClient.setParamURL(obTableClientKey.getParamUrl());
+ obTableClient.setFullUserName(obTableClientKey.getFullUserName());
+ obTableClient.setPassword(obTableClientKey.getPassword());
+ obTableClient.setProperties(obTableClientKey.getProperties());
+ obTableClient.setRunningMode(ObTableClient.RunningMode.HBASE);
+ obTableClient.init();
+ OB_TABLE_CLIENT_INSTANCE.put(obTableClientKey, obTableClient);
+ if (logger.isInfoEnabled()) {
+ logger.info("create ObTableClient success with {}", obTableClientKey);
+ }
+ }
+ } catch (Exception e) {
+ logger.error(LCD.convert("01-00009"), obTableClientKey, e);
+ throw new IOException("create ObTableClient error with " + obTableClientKey, e);
+ } finally {
+ lock.unlock();
+ }
+ }
+ return OB_TABLE_CLIENT_INSTANCE.get(obTableClientKey);
+ }
+
+ /**
+ * Get or create ObTableClient according to paramUrl info.
+ *
+ * @param conf
+ * @param paramUrl
+ * @param fullUserName
+ * @param password
+ * @return
+ * @throws IOException
+ */
+ public static ObTableClient getOrCreateObTableClient(Configuration conf, String paramUrl,
+ String fullUserName, String password)
+ throws IOException {
+ if (logger.isDebugEnabled()) {
+ logger.debug("try to getOrCreateObTableClient with paramUrl:" + paramUrl
+ + ", fullUserName:" + fullUserName);
+ }
+ checkArgument(isNotBlank(paramUrl), HBASE_OCEANBASE_PARAM_URL + " is blank");
+ checkArgument(isNotBlank(fullUserName), HBASE_OCEANBASE_FULL_USER_NAME + " is blank");
+
+ ObTableClientKey obTableClientKey = new ObTableClientKey();
+ obTableClientKey.setParamUrl(paramUrl);
+ obTableClientKey.setFullUserName(fullUserName);
+ obTableClientKey.setPassword(password);
+ for (Property property : Property.values()) {
+ String value = conf.get(property.getKey());
+ if (value != null) {
+ obTableClientKey.getProperties().put(property.getKey(), value);
+ }
+ }
+ return getOrCreateObTableClient(obTableClientKey);
+ }
+
+ /**
+ * ObTableClientKey consists of the triplet: paramUrl, fullUserName and password.
+ */
+ public static class ObTableClientKey {
+
+ private String paramUrl;
+ private String fullUserName;
+ private String password;
+
+ private Properties properties = new Properties();
+
+ public String getParamUrl() {
+ return paramUrl;
+ }
+
+ public void setParamUrl(String paramUrl) {
+ this.paramUrl = paramUrl;
+ }
+
+ public String getFullUserName() {
+ return fullUserName;
+ }
+
+ public void setFullUserName(String fullUserName) {
+ this.fullUserName = fullUserName;
+ }
+
+ public String getPassword() {
+ return password;
+ }
+
+ public void setPassword(String password) {
+ this.password = password;
+ }
+
+ public Properties getProperties() {
+ return properties;
+ }
+
+ public void setProperties(Properties properties) {
+ this.properties = properties;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o)
+ return true;
+ if (o == null || getClass() != o.getClass())
+ return false;
+ ObTableClientKey that = (ObTableClientKey) o;
+ return Objects.equal(paramUrl, that.paramUrl)
+ && Objects.equal(fullUserName, that.fullUserName)
+ && Objects.equal(password, that.password);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(paramUrl, fullUserName, password);
+ }
+
+ @Override
+ public String toString() {
+ return "ObTableClientKey{" + "paramUrl='" + paramUrl + '\'' + ", fullUserName='"
+ + fullUserName + '\'' + ", properties=" + properties + '}';
+ }
+ }
+
+ /**
+ * DdsObTableClientKey consists of the appDataSource triplet: appName, appDsName and version.
+ */
+ public static class DdsObTableClientKey {
+
+ private String appName;
+ private String appDsName;
+ private String version;
+
+ private Properties properties = new Properties();
+
+ public String getAppName() {
+ return appName;
+ }
+
+ public void setAppName(String appName) {
+ this.appName = appName;
+ }
+
+ public String getAppDsName() {
+ return appDsName;
+ }
+
+ public void setAppDsName(String appDsName) {
+ this.appDsName = appDsName;
+ }
+
+ public String getVersion() {
+ return version;
+ }
+
+ public void setVersion(String version) {
+ this.version = version;
+ }
+
+ public Properties getProperties() {
+ return properties;
+ }
+
+ public void setProperties(Properties properties) {
+ this.properties = properties;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o)
+ return true;
+ if (!(o instanceof DdsObTableClientKey))
+ return false;
+ DdsObTableClientKey that = (DdsObTableClientKey) o;
+ return Objects.equal(getAppName(), that.getAppName())
+ && Objects.equal(getAppDsName(), that.getAppDsName())
+ && Objects.equal(getVersion(), that.getVersion());
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(getAppName(), getAppDsName(), getVersion());
+ }
+
+ @Override
+ public String toString() {
+ return "DdsObTableClientKey{" + "appName='" + appName + '\'' + ", appDsName='"
+ + appDsName + '\'' + ", version='" + version + '\'' + ", properties="
+ + properties + '}';
+ }
+ }
+
+ /**
+ * Get or create DdsObTableClient according to ddsObTableClientKey.
+ *
+ * @param ddsObTableClientKey
+ * @return
+ * @throws IOException
+ */
+ public static DdsObTableClient getOrCreateDdsObTableClient(DdsObTableClientKey ddsObTableClientKey)
+ throws IOException {
+ if (DDS_OB_TABLE_CLIENT_INSTANCE.get(ddsObTableClientKey) == null) {
+ ReentrantLock tmp = new ReentrantLock();
+ ReentrantLock lock = DDS_OB_TABLE_CLIENT_LOCK.putIfAbsent(ddsObTableClientKey, tmp);
+ lock = lock == null ? tmp : lock;
+ lock.lock();
+ try {
+ if (DDS_OB_TABLE_CLIENT_INSTANCE.get(ddsObTableClientKey) == null) {
+ DdsObTableClient ddsObTableClient = new DdsObTableClient();
+ ddsObTableClient.setAppName(ddsObTableClientKey.getAppName());
+ ddsObTableClient.setAppDsName(ddsObTableClientKey.getAppDsName());
+ ddsObTableClient.setVersion(ddsObTableClientKey.getVersion());
+ ddsObTableClient.setRunningMode(ObTableClient.RunningMode.HBASE);
+ ddsObTableClient.setTableClientProperty(ddsObTableClientKey.getProperties());
+ ddsObTableClient.init();
+ DDS_OB_TABLE_CLIENT_INSTANCE.put(ddsObTableClientKey, ddsObTableClient);
+ logger.info("create DdsObTableClient success with {}", ddsObTableClientKey);
+ }
+ } catch (Exception e) {
+ logger.error(LCD.convert("01-00010"), ddsObTableClientKey, e);
+ throw new IOException("create DdsObTableClient error with " + ddsObTableClientKey,
+ e);
+ } finally {
+ lock.unlock();
+ }
+ }
+ return DDS_OB_TABLE_CLIENT_INSTANCE.get(ddsObTableClientKey);
+ }
+
+ /**
+ * Get or create DdsObTableClient according to appDataSource.
+ *
+ * @param conf
+ * @return
+ * @throws IllegalArgumentException
+ * @throws IOException
+ */
+ public static DdsObTableClient getOrCreateDdsObTableClient(Configuration conf)
+ throws IllegalArgumentException,
+ IOException {
+ String appName = conf.get(HBASE_OCEANBASE_DDS_APP_NAME);
+ String appDsName = conf.get(HBASE_OCEANBASE_DDS_APP_DS_NAME);
+ String version = conf.get(HBASE_OCEANBASE_DDS_VERSION);
+ if (logger.isDebugEnabled()) {
+ logger.debug("try to getOrCreateDdsObTableClient with appName:" + appName
+ + ", appDsName:" + appDsName + ", version:" + version);
+ }
+ checkArgument(isNotBlank(appName), HBASE_OCEANBASE_DDS_APP_NAME + " is blank");
+ checkArgument(isNotBlank(appDsName), HBASE_OCEANBASE_DDS_APP_DS_NAME + " is blank");
+ checkArgument(isNotBlank(version), HBASE_OCEANBASE_DDS_VERSION + " is blank");
+
+ DdsObTableClientKey ddsObTableClientKey = new DdsObTableClientKey();
+ ddsObTableClientKey.setAppName(appName);
+ ddsObTableClientKey.setAppDsName(appDsName);
+ ddsObTableClientKey.setVersion(version);
+
+ for (Property property : Property.values()) {
+ String value = conf.get(property.getKey());
+ if (value != null) {
+ ddsObTableClientKey.getProperties().put(property.getKey(), value);
+ }
+ }
+ return getOrCreateDdsObTableClient(ddsObTableClientKey);
+ }
+}
\ No newline at end of file
diff --git a/src/main/java/com/alipay/oceanbase/hbase/util/KeyDefiner.java b/src/main/java/com/alipay/oceanbase/hbase/util/KeyDefiner.java
index 058d05e7..8e90ecff 100644
--- a/src/main/java/com/alipay/oceanbase/hbase/util/KeyDefiner.java
+++ b/src/main/java/com/alipay/oceanbase/hbase/util/KeyDefiner.java
@@ -25,4 +25,14 @@ public class KeyDefiner {
public static String genPooledOHTableAttributeName(String tableName, String key) {
return tableName + OHConstants.HBASE_HTABLE_POOL_SEPERATOR + key;
}
+ public static String[] parsePoolOHTableAttributeName(String key) {
+ if (key == null) {
+ return null;
+ }
+ String[] parsedKey = key.split("\\" + OHConstants.HBASE_HTABLE_POOL_SEPERATOR);
+ if (parsedKey.length != 2) {
+ return null;
+ }
+ return parsedKey;
+ }
}
diff --git a/src/main/java/com/alipay/oceanbase/hbase/util/OHTableFactory.java b/src/main/java/com/alipay/oceanbase/hbase/util/OHTableFactory.java
index 8d42daa4..7ed4c3ca 100644
--- a/src/main/java/com/alipay/oceanbase/hbase/util/OHTableFactory.java
+++ b/src/main/java/com/alipay/oceanbase/hbase/util/OHTableFactory.java
@@ -92,6 +92,17 @@ public HTableInterface createHTableInterface(Configuration config, byte[] tableN
}
private Configuration adjustConfiguration(Configuration configuration, String tableName) {
+ // Check if DDS mode is configured
+ String ddsAppName = configuration.get(HBASE_OCEANBASE_DDS_APP_NAME);
+ String ddsAppDsName = configuration.get(HBASE_OCEANBASE_DDS_APP_DS_NAME);
+ String ddsVersion = configuration.get(HBASE_OCEANBASE_DDS_VERSION);
+
+ if (isNotBlank(ddsAppName) && isNotBlank(ddsAppDsName) && isNotBlank(ddsVersion)) {
+ // DDS mode - no additional parameter validation needed
+ // The DDS parameters are already set in configuration
+ return configuration;
+ }
+
byte[] isOdpModeAttr = tablePool.getTableAttribute(tableName, HBASE_OCEANBASE_ODP_MODE);
if ((isOdpModeAttr != null && Bytes.toBoolean(isOdpModeAttr))
|| (isOdpModeAttr == null && configuration.getBoolean(HBASE_OCEANBASE_ODP_MODE, false))) {
diff --git a/src/main/java/com/alipay/oceanbase/hbase/util/TableHBaseLoggerFactory.java b/src/main/java/com/alipay/oceanbase/hbase/util/TableHBaseLoggerFactory.java
index f90b5d0a..a66da1f6 100644
--- a/src/main/java/com/alipay/oceanbase/hbase/util/TableHBaseLoggerFactory.java
+++ b/src/main/java/com/alipay/oceanbase/hbase/util/TableHBaseLoggerFactory.java
@@ -21,6 +21,7 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.slf4j.helpers.NOPLogger;
@InterfaceAudience.Private
public class TableHBaseLoggerFactory {
@@ -28,6 +29,8 @@ public class TableHBaseLoggerFactory {
public static final String TABLE_HBASE_LOGGER_SPACE = "oceanbase-table-hbase";
public static LogCode2Description LCD = LogCode2Description
.create(TABLE_HBASE_LOGGER_SPACE);
+ public static Logger MONITOR = NOPLogger.NOP_LOGGER;
+ public static Logger logger = NOPLogger.NOP_LOGGER;
public static Logger getLogger(String name) {
if (name == null || name.isEmpty()) {
diff --git a/src/test/java/com/alipay/oceanbase/hbase/OHTableDDSTest.java b/src/test/java/com/alipay/oceanbase/hbase/OHTableDDSTest.java
new file mode 100644
index 00000000..56365368
--- /dev/null
+++ b/src/test/java/com/alipay/oceanbase/hbase/OHTableDDSTest.java
@@ -0,0 +1,832 @@
+/*-
+ * #%L
+ * OceanBase Table Hbase Framework
+ * %%
+ * Copyright (C) 2016 - 2021 Ant Financial Services Group
+ * %%
+ * OBKV HBase Client Framework is licensed under Mulan PSL v2.
+ * You can use this software according to the terms and conditions of the Mulan PSL v2.
+ * You may obtain a copy of Mulan PSL v2 at:
+ * http://license.coscl.org.cn/MulanPSL2
+ * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
+ * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
+ * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
+ * See the Mulan PSL v2 for more details.
+ * #L%
+ */
+
+package com.alipay.oceanbase.hbase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.*;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.util.PoolMap;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.Executors;
+
+import static com.alipay.oceanbase.hbase.constants.OHConstants.*;
+import static org.apache.hadoop.hbase.util.Bytes.toBytes;
+
+public class OHTableDDSTest {
+ private String APP_NAME = "obkv";
+ private String APP_DS_NAME_4x = "obkv4_adapt_dds_client_test";
+ private String APP_DS_NAME_2x = "obkv4_adapt_dds_client_test_2x";
+ private String VERSION = "v1.0";
+ private String APP_DS_NAME = APP_DS_NAME_2x;
+ private String hTableName = APP_DS_NAME.equals(APP_DS_NAME_4x) ? "test" : "testt";
+
+ protected OHTablePool ohTablePool;
+
+ @Before
+ public void setup() throws Exception {
+ Configuration conf = new Configuration();
+ conf.set(HBASE_OCEANBASE_DDS_APP_NAME, APP_NAME);
+ conf.set(HBASE_OCEANBASE_DDS_APP_DS_NAME, APP_DS_NAME);
+ conf.set(HBASE_OCEANBASE_DDS_VERSION, VERSION);
+ ohTablePool = new OHTablePool(conf, 10, PoolMap.PoolType.Reusable);
+ ohTablePool.setRuntimeBatchExecutor("testt", Executors.newFixedThreadPool(33));
+ ohTablePool.init();
+ }
+
+ /*
+ *
+ create table if not exists test$family_01(
+ K varbinary(1024) NOT NULL,
+ Q varbinary(256) NOT NULL,
+ T bigint(20) NOT NULL,
+ V varbinary(1024) NOT NULL,
+ PRIMARY KEY (K, Q, T)
+ ) TABLEGROUP = 'test';
+
+ create table if not exists test$family_02(
+ K varbinary(1024) NOT NULL,
+ Q varbinary(256) NOT NULL,
+ T bigint(20) NOT NULL,
+ V varbinary(1024) NOT NULL,
+ PRIMARY KEY (K, Q, T)
+ ) TABLEGROUP = 'test';
+
+ create table if not exists test$family_03(
+ K varbinary(1024) NOT NULL,
+ Q varbinary(256) NOT NULL,
+ T bigint(20) NOT NULL,
+ V varbinary(1024) NOT NULL,
+ PRIMARY KEY (K, Q, T)
+ ) TABLEGROUP = 'test';
+
+ */
+
+ /**
+ * TestCase: 分表路由到单个分区,表为 test$family_00.
+ * 扩展测试:覆盖00-09所有分区的单分区测试
+ */
+ @Test
+ public void testSharding00() throws Exception {
+ String family = "family";
+ String column1 = "column1";
+ String value = "value";
+ long timestamp = System.currentTimeMillis();
+
+ // 测试所有分区 00-09
+ for (int partition = 0; partition <= 9; partition++) {
+ String partitionPrefix = String.format("%02d", partition);
+ String key1 = partitionPrefix + "_TESTKEY0";
+ String key2 = partitionPrefix + "_TESTKEY1";
+ String key3 = partitionPrefix + "_TESTKEY2";
+
+ HTableInterface hTable = ohTablePool.getTable(hTableName);
+
+ try {
+ // 测试单个分区内的操作
+ Put put = new Put(toBytes(key1));
+ put.add(family.getBytes(), column1.getBytes(), timestamp,
+ toBytes(value + "_" + partitionPrefix));
+ hTable.put(put);
+ Get get = new Get(toBytes(key1));
+ get.addColumn(family.getBytes(), toBytes(column1));
+ Result r = hTable.get(get);
+ Assert.assertEquals(1, r.raw().length);
+
+ put = new Put(toBytes(key2));
+ put.add(family.getBytes(), column1.getBytes(), timestamp,
+ toBytes(value + "_" + partitionPrefix));
+ hTable.put(put);
+
+ put = new Put(toBytes(key3));
+ put.add(family.getBytes(), column1.getBytes(), timestamp,
+ toBytes(value + "_" + partitionPrefix));
+ hTable.put(put);
+
+ // 同分区内的scan应该成功
+ Scan scan = new Scan();
+ scan.addColumn(family.getBytes(), column1.getBytes());
+ scan.setStartRow(toBytes(key1));
+ scan.setStopRow(toBytes(key3));
+ scan.setMaxVersions(9);
+ ResultScanner scanner = hTable.getScanner(scan);
+
+ int count = 0;
+ for (Result result : scanner) {
+ for (KeyValue keyValue : result.raw()) {
+ System.out.println("Partition " + partitionPrefix + " - rowKey: "
+ + new String(keyValue.getRow()) + " columnQualifier:"
+ + new String(keyValue.getQualifier()) + " timestamp:"
+ + keyValue.getTimestamp() + " value:"
+ + new String(keyValue.getValue()));
+ count++;
+ }
+ }
+ scanner.close();
+ Assert.assertEquals(2, count);
+
+ } finally {
+ // 清理数据
+ Delete delete = new Delete(toBytes(key1));
+ delete.deleteFamily(family.getBytes());
+ hTable.delete(delete);
+ delete = new Delete(toBytes(key2));
+ delete.deleteFamily(family.getBytes());
+ hTable.delete(delete);
+ delete = new Delete(toBytes(key3));
+ delete.deleteFamily(family.getBytes());
+ hTable.delete(delete);
+ }
+ }
+ }
+
+ /**
+ * TestCase: 分表路由到多个分区,分表为 test$family_{00-09}.
+ * 扩展测试:覆盖所有00-09分区的跨分区测试
+ */
+ @Test
+ public void testSharding01() throws Exception {
+ String family = "family";
+ String column1 = "column1";
+ // 扩展到覆盖所有分区 00-09
+ String key[] = new String[10];
+ for (int i = 0; i < 10; i++) {
+ key[i] = String.format("%02dTEST_KEY%d", i, i);
+ }
+ String value = "value";
+ long timestamp = System.currentTimeMillis();
+
+ Configuration conf = new Configuration();
+ conf.set(HBASE_OCEANBASE_DDS_APP_NAME, APP_NAME);
+ conf.set(HBASE_OCEANBASE_DDS_APP_DS_NAME, APP_DS_NAME);
+ conf.set(HBASE_OCEANBASE_DDS_VERSION, VERSION);
+ HTableInterface hTable = ohTablePool.getTable(hTableName);
+
+ try {
+ // 测试所有分区的写入和读取
+ for (int i = 0; i < key.length; i++) {
+ Put put = new Put(toBytes(key[i]));
+ put.add(family.getBytes(), column1.getBytes(), timestamp,
+ toBytes(value + "_partition_" + String.format("%02d", i)));
+ hTable.put(put);
+ Get get = new Get(toBytes(key[i]));
+ get.addColumn(family.getBytes(), toBytes(column1));
+ Result r = hTable.get(get);
+ Assert.assertEquals(1, r.raw().length);
+ System.out.println("Successfully wrote and read from partition "
+ + String.format("%02d", i));
+ }
+
+ // 测试单个分区内的scan(应该成功)
+ for (int partition = 0; partition < 10; partition++) {
+ String partitionPrefix = String.format("%02d", partition);
+ Scan scan = new Scan();
+ scan.addColumn(family.getBytes(), column1.getBytes());
+ scan.setStartRow(toBytes(partitionPrefix + "TEST_KEY" + partition));
+ scan.setStopRow(toBytes(partitionPrefix + "TEST_KEY" + partition + "Z"));
+ scan.setMaxVersions(9);
+ ResultScanner scanner = hTable.getScanner(scan);
+
+ int count = 0;
+ for (Result result : scanner) {
+ for (KeyValue keyValue : result.raw()) {
+ System.out.println("Partition " + partitionPrefix + " scan - rowKey: "
+ + new String(keyValue.getRow()) + " value: "
+ + new String(keyValue.getValue()));
+ count++;
+ }
+ }
+ scanner.close();
+ Assert.assertEquals(1, count);
+ }
+
+ // 跨 partition 的 Scan 是不支持的 - 测试多个跨分区场景
+ int[][] crossPartitionTests = { { 0, 2 }, { 1, 4 }, { 5, 8 }, { 3, 9 } };
+ for (int[] testCase : crossPartitionTests) {
+ try {
+ Scan scan = new Scan();
+ scan.addColumn(family.getBytes(), column1.getBytes());
+ scan.setStartRow(toBytes(key[testCase[0]]));
+ scan.setStopRow(toBytes(key[testCase[1]]));
+ scan.setMaxVersions(9);
+ hTable.getScanner(scan);
+ Assert.fail("Expected exception for cross-partition scan from " + testCase[0]
+ + " to " + testCase[1]);
+ } catch (Exception e) {
+ System.out.println("Cross-partition scan correctly failed: " + testCase[0]
+ + " to " + testCase[1]);
+ Assert.assertTrue("Cross-partition scan should fail", true);
+ }
+ }
+ } finally {
+ for (int i = 0; i < key.length; i++) {
+ Delete delete = new Delete(toBytes(key[i]));
+ delete.deleteFamily(family.getBytes());
+ hTable.delete(delete);
+ }
+ hTable.close();
+ }
+ }
+
+ @Test
+ public void testAppendIncrement() throws Exception {
+ String family = "family";
+ String column1 = "column1";
+ String key1 = "00_TESTKEY0";
+ String value = "value";
+ String appendValue = "appendValue";
+ String key2 = "00_TESTKEY1";
+ long incrValue = 100L;
+
+ Configuration conf = new Configuration();
+ conf.set(HBASE_OCEANBASE_DDS_APP_NAME, APP_NAME);
+ conf.set(HBASE_OCEANBASE_DDS_APP_DS_NAME, APP_DS_NAME);
+ conf.set(HBASE_OCEANBASE_DDS_VERSION, VERSION);
+ HTableInterface hTable = ohTablePool.getTable(hTableName);
+
+ try {
+ long timestamp = System.currentTimeMillis();
+ Put put = new Put(toBytes(key1));
+ put.add(family.getBytes(), column1.getBytes(), timestamp, toBytes(value));
+ hTable.put(put);
+
+ Get get = new Get(toBytes(key1));
+ get.addColumn(family.getBytes(), toBytes(column1));
+ Result r = hTable.get(get);
+ Assert.assertEquals(1, r.raw().length);
+
+ Assert.assertEquals(value,
+ new String(r.getValue(family.getBytes(), column1.getBytes())));
+
+ Append append = new Append(toBytes(key1));
+ append.add(family.getBytes(), column1.getBytes(), toBytes(appendValue));
+ hTable.append(append);
+ r = hTable.get(get);
+ Assert.assertEquals(1, r.raw().length);
+ Assert.assertEquals(value + appendValue,
+ new String(r.getValue(family.getBytes(), column1.getBytes())));
+
+ Increment increment = new Increment(toBytes(key2));
+ increment.addColumn(family.getBytes(), column1.getBytes(), incrValue);
+ hTable.increment(increment);
+ get = new Get(toBytes(key2));
+ get.addColumn(family.getBytes(), toBytes(column1));
+ r = hTable.get(get);
+ Assert.assertEquals(1, r.raw().length);
+ System.out.println(Arrays.toString(r.getValue(family.getBytes(), column1.getBytes())));
+ long actualValue = Bytes.toLong(r.getValue(family.getBytes(), column1.getBytes()));
+ Assert.assertEquals(incrValue, actualValue);
+ } finally {
+ Delete delete = new Delete(toBytes(key1));
+ delete.deleteFamily(family.getBytes());
+ hTable.delete(delete);
+ delete = new Delete(toBytes(key2));
+ delete.deleteFamily(family.getBytes());
+ hTable.delete(delete);
+ hTable.close();
+ }
+ }
+
+ @Test
+ public void testCheckAndPut() throws Exception {
+ String family = "family";
+ String column1 = "column1";
+ String key1 = "00_TESTKEY0";
+ String value = "value";
+ String newValue = "newValue";
+ long timestamp = System.currentTimeMillis();
+ Configuration conf = new Configuration();
+ conf.set(HBASE_OCEANBASE_DDS_APP_NAME, APP_NAME);
+ conf.set(HBASE_OCEANBASE_DDS_APP_DS_NAME, APP_DS_NAME);
+ conf.set(HBASE_OCEANBASE_DDS_VERSION, VERSION);
+ HTableInterface hTable = ohTablePool.getTable(hTableName);
+ try {
+ Put put = new Put(key1.getBytes());
+ put.add(family.getBytes(), column1.getBytes(), value.getBytes());
+ hTable.put(put);
+ Get get = new Get(key1.getBytes());
+ get.addColumn(family.getBytes(), column1.getBytes());
+ Result r = hTable.get(get);
+ Assert.assertEquals(1, r.raw().length);
+ put = new Put(key1.getBytes());
+ put.add(family.getBytes(), column1.getBytes(), newValue.getBytes());
+ boolean ret = hTable.checkAndPut(key1.getBytes(), family.getBytes(),
+ column1.getBytes(), value.getBytes(), put);
+ Assert.assertTrue(ret);
+ r = hTable.get(get);
+ Assert.assertEquals(1, r.raw().length);
+ Assert.assertEquals(newValue,
+ new String(r.getValue(family.getBytes(), column1.getBytes())));
+ } finally {
+ Delete delete = new Delete(key1.getBytes());
+ delete.deleteFamily(family.getBytes());
+ hTable.delete(delete);
+ hTable.close();
+ }
+ }
+
+ /**
+ * TestCase: 跨数据库测试 - 测试不同数据库 group_00-group_09 的表操作
+ * 每个数据库包含 test$family_00-test$family_09 表,覆盖所有分区
+ */
+ @Test
+ public void testCrossDatabase() throws Exception {
+ String family = "family";
+ String column1 = "column1";
+ String value = "cross_db_value";
+
+ Configuration conf = new Configuration();
+ conf.set(HBASE_OCEANBASE_DDS_APP_NAME, APP_NAME);
+ conf.set(HBASE_OCEANBASE_DDS_APP_DS_NAME, APP_DS_NAME);
+ conf.set(HBASE_OCEANBASE_DDS_VERSION, VERSION);
+ HTableInterface hTable = ohTablePool.getTable(hTableName);
+
+ List testKeys = new ArrayList<>();
+ try {
+ // 测试跨数据库和跨表操作:使用key前缀路由到不同的数据库group_XX和表test$family_XX
+ for (int dbIndex = 0; dbIndex <= 9; dbIndex++) {
+ for (int tableIndex = 0; tableIndex <= 9; tableIndex++) {
+ // 使用特定格式的key确保路由到正确的数据库和表
+ // 格式:dbIndex_tableIndex_... 来路由到 group_dbIndex 数据库的 test$family_tableIndex 表
+ String key = String.format("%02d_%02d_CROSS_DB_TABLE_KEY", dbIndex, tableIndex);
+ testKeys.add(key);
+
+ // 写入数据
+ Put put = new Put(toBytes(key));
+ put.add(family.getBytes(), column1.getBytes(),
+ toBytes(value + "_db" + dbIndex + "_table" + tableIndex));
+ hTable.put(put);
+
+ // 验证写入
+ Get get = new Get(toBytes(key));
+ get.addColumn(family.getBytes(), toBytes(column1));
+ Result r = hTable.get(get);
+ Assert.assertEquals(1, r.raw().length);
+
+ String expectedValue = value + "_db" + dbIndex + "_table" + tableIndex;
+ String actualValue = new String(r.getValue(family.getBytes(), column1.getBytes()));
+ Assert.assertEquals(expectedValue, actualValue);
+
+ System.out.println("Cross-DB-Table test: Successfully wrote/read group_" +
+ String.format("%02d", dbIndex) + ".test$family_" + String.format("%02d", tableIndex) +
+ " with key: " + key);
+ }
+ }
+
+ // 测试同一分区内的批量操作(避免跨分区批量操作)
+ // 只在一个特定的数据库表组合中进行批量操作
+ int testDbIndex = 0;
+ int testTableIndex = 0;
+ List sameBatchPuts = new ArrayList<>();
+ for (int i = 0; i < 5; i++) { // 在同一个分区中创建5条记录
+ String batchKey = String.format("%02d_%02d_SAME_BATCH_KEY_%d", testDbIndex, testTableIndex, i);
+ testKeys.add(batchKey);
+
+ Put put = new Put(toBytes(batchKey));
+ put.add(family.getBytes(), column1.getBytes(),
+ toBytes("same_batch_value_" + i + "_db" + testDbIndex + "_table" + testTableIndex));
+ sameBatchPuts.add(put);
+ }
+
+ // 执行同分区批量写入
+ hTable.put(sameBatchPuts);
+ System.out.println("Same-partition batch: Completed batch put of " + sameBatchPuts.size() +
+ " records in group_" + String.format("%02d", testDbIndex) + ".test$family_" + String.format("%02d", testTableIndex));
+
+ // 验证批量写入的数据
+ for (int i = 0; i < 5; i++) {
+ String batchKey = String.format("%02d_%02d_SAME_BATCH_KEY_%d", testDbIndex, testTableIndex, i);
+ Get get = new Get(toBytes(batchKey));
+ get.addColumn(family.getBytes(), toBytes(column1));
+ Result r = hTable.get(get);
+ Assert.assertEquals(1, r.raw().length);
+
+ String expectedValue = "same_batch_value_" + i + "_db" + testDbIndex + "_table" + testTableIndex;
+ String actualValue = new String(r.getValue(family.getBytes(), column1.getBytes()));
+ Assert.assertEquals(expectedValue, actualValue);
+ }
+ System.out.println("Same-partition batch verification completed successfully");
+
+ } finally {
+ // 清理数据
+ for (String key : testKeys) {
+ Delete delete = new Delete(toBytes(key));
+ delete.deleteFamily(family.getBytes());
+ hTable.delete(delete);
+ }
+ hTable.close();
+ }
+ }
+
+ /**
+ * TestCase: 跨表测试 - 测试同一数据库内不同表的操作
+ * 在每个数据库 group_XX 中测试 test$family_00 到 test$family_09 的表操作
+ */
+ @Test
+ public void testCrossTable() throws Exception {
+ String family = "family";
+ String column1 = "column1";
+ String value = "cross_table_value";
+
+ Configuration conf = new Configuration();
+ conf.set(HBASE_OCEANBASE_DDS_APP_NAME, APP_NAME);
+ conf.set(HBASE_OCEANBASE_DDS_APP_DS_NAME, APP_DS_NAME);
+ conf.set(HBASE_OCEANBASE_DDS_VERSION, VERSION);
+ HTableInterface hTable = ohTablePool.getTable(hTableName);
+
+ List testKeys = new ArrayList<>();
+
+ try {
+ // 测试跨表操作:通过不同的key前缀路由到不同的表 test$family_XX
+ // 这里只测试同一个数据库内的跨表操作
+ for (int tableIndex = 0; tableIndex <= 9; tableIndex++) {
+ for (int recordIndex = 0; recordIndex < 3; recordIndex++) { // 每个表3条记录
+ // 使用特定格式确保路由到正确的表
+ // 格式:00_tableIndex_... 来路由到 group_00 数据库的 test$family_tableIndex 表
+ String key = String.format("00_%02d_TABLE_KEY_%d", tableIndex, recordIndex);
+ testKeys.add(key);
+
+ // 写入数据
+ Put put = new Put(toBytes(key));
+ put.add(family.getBytes(), column1.getBytes(),
+ toBytes(value + "_table" + tableIndex + "_record" + recordIndex));
+ hTable.put(put);
+
+ // 验证写入
+ Get get = new Get(toBytes(key));
+ get.addColumn(family.getBytes(), toBytes(column1));
+ Result r = hTable.get(get);
+ Assert.assertEquals(1, r.raw().length);
+
+ String expectedValue = value + "_table" + tableIndex + "_record" + recordIndex;
+ String actualValue = new String(r.getValue(family.getBytes(), column1.getBytes()));
+ Assert.assertEquals(expectedValue, actualValue);
+
+ System.out.println("Cross-table test: Successfully wrote/read group_00.test$family_" +
+ String.format("%02d", tableIndex) + " record " + recordIndex + " with key: " + key);
+ }
+ }
+
+ // 测试跨表的条件操作
+ for (int tableIndex = 0; tableIndex < 5; tableIndex++) { // 测试前5个表
+ String checkKey = String.format("00_%02d_CHECK_PUT", tableIndex);
+ testKeys.add(checkKey);
+
+ Put initialPut = new Put(toBytes(checkKey));
+ initialPut.add(family.getBytes(), column1.getBytes(), toBytes("initial"));
+ hTable.put(initialPut);
+
+ Put updatePut = new Put(toBytes(checkKey));
+ updatePut.add(family.getBytes(), column1.getBytes(), toBytes("updated"));
+ boolean success = hTable.checkAndPut(toBytes(checkKey), family.getBytes(),
+ column1.getBytes(), toBytes("initial"), updatePut);
+ Assert.assertTrue("CheckAndPut should succeed for table " + tableIndex, success);
+
+ Get get = new Get(toBytes(checkKey));
+ get.addColumn(family.getBytes(), toBytes(column1));
+ Result r = hTable.get(get);
+ Assert.assertEquals("updated",
+ new String(r.getValue(family.getBytes(), column1.getBytes())));
+
+ System.out.println("Cross-table CheckAndPut: Successfully updated table " + tableIndex);
+ }
+
+ } finally {
+ for (String key : testKeys) {
+ try {
+ Delete delete = new Delete(toBytes(key));
+ delete.deleteFamily(family.getBytes());
+ hTable.delete(delete);
+ } catch (Exception e) {
+ }
+ }
+ hTable.close();
+ }
+ }
+
+ /**
+ * TestCase: 批量操作跨分区测试
+ * 测试批量写入和读取跨多个分区的数据
+ */
+ @Test
+ public void testBatchOperationsAcrossPartitions() throws Exception {
+ String family = "family";
+ String column1 = "column1";
+ String value = "batch_value";
+ long timestamp = System.currentTimeMillis();
+
+ Configuration conf = new Configuration();
+ conf.set(HBASE_OCEANBASE_DDS_APP_NAME, APP_NAME);
+ conf.set(HBASE_OCEANBASE_DDS_APP_DS_NAME, APP_DS_NAME);
+ conf.set(HBASE_OCEANBASE_DDS_VERSION, VERSION);
+ HTableInterface hTable = ohTablePool.getTable(hTableName);
+
+ List testKeys = new ArrayList<>();
+
+ try {
+ int totalSuccessCount = 0;
+ for (int partition = 0; partition <= 9; partition++) {
+ List partitionPuts = new ArrayList<>();
+ List partitionGets = new ArrayList<>();
+ List partitionKeys = new ArrayList<>();
+
+ for (int i = 0; i < 3; i++) { // 每个分区3条数据
+ String key = String.format("%02d_BATCH_KEY_%d_%d", partition, partition, i);
+ partitionKeys.add(key);
+ testKeys.add(key);
+
+ Put put = new Put(toBytes(key));
+ put.add(family.getBytes(), column1.getBytes(),
+ toBytes(value + "_" + partition + "_" + i));
+ partitionPuts.add(put);
+
+ Get get = new Get(toBytes(key));
+ get.addColumn(family.getBytes(), toBytes(column1));
+ partitionGets.add(get);
+ }
+
+ hTable.put(partitionPuts);
+ System.out.println("Batch put completed for partition " + String.format("%02d", partition) +
+ ": " + partitionPuts.size() + " records");
+
+ Result[] results = hTable.get(partitionGets);
+ Assert.assertEquals(partitionGets.size(), results.length);
+
+ int partitionSuccessCount = 0;
+ for (int i = 0; i < results.length; i++) {
+ Result result = results[i];
+ if (result != null && result.raw().length > 0) {
+ String expectedValue = value + "_" + partition + "_" + i;
+ String actualValue = new String(result.getValue(family.getBytes(), column1.getBytes()));
+ Assert.assertEquals(expectedValue, actualValue);
+ partitionSuccessCount++;
+ }
+ }
+ Assert.assertEquals("All batch reads should succeed for partition " + partition,
+ partitionKeys.size(), partitionSuccessCount);
+ totalSuccessCount += partitionSuccessCount;
+
+ System.out.println("Batch get completed for partition " + String.format("%02d", partition) +
+ ": " + partitionSuccessCount + " records verified");
+ }
+
+ System.out.println("Total batch operations completed: " + totalSuccessCount + " records across all partitions");
+
+ } finally {
+ for (int partition = 0; partition <= 9; partition++) {
+ List partitionDeletes = new ArrayList<>();
+ String partitionPrefix = String.format("%02d_", partition);
+
+ for (String key : testKeys) {
+ if (key.startsWith(partitionPrefix)) {
+ Delete delete = new Delete(toBytes(key));
+ delete.deleteFamily(family.getBytes());
+ partitionDeletes.add(delete);
+ }
+ }
+
+ if (!partitionDeletes.isEmpty()) {
+ hTable.delete(partitionDeletes);
+ System.out.println("Cleaned up " + partitionDeletes.size() + " records from partition " +
+ String.format("%02d", partition));
+ }
+ }
+ hTable.close();
+ }
+ }
+
+ @Test
+ public void testCheckAndDelete() throws Exception {
+ String family = "family";
+ String column1 = "column1";
+ String key1 = "00_TESTDELETEKEY";
+ String key2 = "00_TESTDELETEKEY2";
+ String value = "value";
+ String newValue = "newValue";
+ long timestamp = System.currentTimeMillis();
+ Configuration conf = new Configuration();
+ conf.set(HBASE_OCEANBASE_DDS_APP_NAME, APP_NAME);
+ conf.set(HBASE_OCEANBASE_DDS_APP_DS_NAME, APP_DS_NAME);
+ conf.set(HBASE_OCEANBASE_DDS_VERSION, VERSION);
+ HTableInterface hTable = ohTablePool.getTable(hTableName);
+
+ try {
+ Put put = new Put(key1.getBytes());
+ put.add(family.getBytes(), column1.getBytes(), value.getBytes());
+ hTable.put(put);
+ Get get = new Get(key1.getBytes());
+ get.addColumn(family.getBytes(), column1.getBytes());
+ Result r = hTable.get(get);
+ Assert.assertEquals(1, r.raw().length);
+ Delete delete = new Delete(key1.getBytes());
+ delete.deleteColumn(family.getBytes(), column1.getBytes());
+ boolean ret = hTable.checkAndDelete(key1.getBytes(), family.getBytes(),
+ column1.getBytes(), value.getBytes(), delete);
+ Assert.assertTrue(ret);
+ r = hTable.get(get);
+ Assert.assertEquals(0, r.raw().length);
+ } finally {
+ Delete delete = new Delete(key1.getBytes());
+ delete.deleteFamily(family.getBytes());
+ hTable.delete(delete);
+ hTable.close();
+ }
+ }
+
+ /*
+ create tablegroup if not exists test_multicf SHARDING = 'ADAPTIVE';
+ create table if not exists test_multicf$family1_00(
+ K varbinary(1024) NOT NULL,
+ Q varbinary(256) NOT NULL,
+ T bigint(20) NOT NULL,
+ V varbinary(1024) NOT NULL,
+ PRIMARY KEY (K, Q, T)
+ ) TABLEGROUP = 'test_multicf';
+
+ create table if not exists test_multicf$family1_01(
+ K varbinary(1024) NOT NULL,
+ Q varbinary(256) NOT NULL,
+ T bigint(20) NOT NULL,
+ V varbinary(1024) NOT NULL,
+ PRIMARY KEY (K, Q, T)
+ ) TABLEGROUP = 'test_multicf';
+
+ create table if not exists test_multicf$family1_02(
+ K varbinary(1024) NOT NULL,
+ Q varbinary(256) NOT NULL,
+ T bigint(20) NOT NULL,
+ V varbinary(1024) NOT NULL,
+ PRIMARY KEY (K, Q, T)
+ ) TABLEGROUP = 'test_multicf';
+
+ create table if not exists test_multicf$family1_03(
+ K varbinary(1024) NOT NULL,
+ Q varbinary(256) NOT NULL,
+ T bigint(20) NOT NULL,
+ V varbinary(1024) NOT NULL,
+ PRIMARY KEY (K, Q, T)
+ ) TABLEGROUP = 'test_multicf';
+
+ create table if not exists test_multicf$family2_00(
+ K varbinary(1024) NOT NULL,
+ Q varbinary(256) NOT NULL,
+ T bigint(20) NOT NULL,
+ V varbinary(1024) NOT NULL,
+ PRIMARY KEY (K, Q, T)
+ ) TABLEGROUP = 'test_multicf';
+
+ create table if not exists test_multicf$family2_01(
+ K varbinary(1024) NOT NULL,
+ Q varbinary(256) NOT NULL,
+ T bigint(20) NOT NULL,
+ V varbinary(1024) NOT NULL,
+ PRIMARY KEY (K, Q, T)
+ ) TABLEGROUP = 'test_multicf';
+
+ create table if not exists test_multicf$family2_02(
+ K varbinary(1024) NOT NULL,
+ Q varbinary(256) NOT NULL,
+ T bigint(20) NOT NULL,
+ V varbinary(1024) NOT NULL,
+ PRIMARY KEY (K, Q, T)
+ ) TABLEGROUP = 'test_multicf';
+
+ create table if not exists test_multicf$family2_03(
+ K varbinary(1024) NOT NULL,
+ Q varbinary(256) NOT NULL,
+ T bigint(20) NOT NULL,
+ V varbinary(1024) NOT NULL,
+ PRIMARY KEY (K, Q, T)
+ ) TABLEGROUP = 'test_multicf';
+ */
+
+ /**
+ * TestCase: 多CF功能测试, 每个CF进行分表
+ */
+ @Ignore
+ public void testSharding02() throws Exception {
+ String tableName = "test_multicf";
+ String family1 = "family1";
+ String family2 = "family2";
+ String column1 = "column1";
+ List keyList = new ArrayList<>();
+ for (int partition = 0; partition <= 9; partition++) {
+ for (int i = 0; i < 2; i++) { // 每个分区2个key
+ keyList.add(String.format("%02dTEST_KEY%d_%d", partition, partition, i));
+ }
+ }
+ String[] key = keyList.toArray(new String[0]);
+ String value = "value";
+
+ Configuration conf = new Configuration();
+ conf.set(HBASE_OCEANBASE_DDS_APP_NAME, APP_NAME);
+ conf.set(HBASE_OCEANBASE_DDS_APP_DS_NAME, APP_DS_NAME);
+ conf.set(HBASE_OCEANBASE_DDS_VERSION, VERSION);
+ HTableInterface hTable = new OHTable(conf, tableName);
+
+ try {
+ for (int i = 0; i < key.length; i++) {
+ Put put = new Put(toBytes(key[i]));
+ put.addColumn(family1.getBytes(), (column1 + family1).getBytes(), toBytes(value + key[i]
+ + family1));
+ put.addColumn(family2.getBytes(), (column1 + family2).getBytes(), toBytes(value + key[i]
+ + family2));
+ hTable.put(put);
+ System.out.println("Multi-CF: Successfully wrote key " + key[i] + " to both families");
+ }
+
+ for (int i = 0; i < key.length; i++) {
+ Get get = new Get(toBytes(key[i]));
+ get.addFamily(family1.getBytes());
+ get.addFamily(family2.getBytes());
+ Result result = hTable.get(get);
+ Assert.assertEquals(2, result.raw().length);
+ System.out.println("Multi-CF: Successfully read key " + key[i] + " from both families");
+ }
+
+ List> scans = new ArrayList<>();
+ for (int partition = 0; partition <= 9; partition++) {
+ String startKey = String.format("%02dA", partition);
+ String endKey = String.format("%02dZ", partition);
+ scans.add(new Pair<>(toBytes(startKey), toBytes(endKey)));
+ }
+ for (int partition = 0; partition < scans.size(); partition++) {
+ Pair scan = scans.get(partition);
+
+ Scan s = new Scan(scan.getFirst(), scan.getSecond());
+ s.addFamily(family1.getBytes());
+ ResultScanner scanner = hTable.getScanner(s);
+ int count1 = 0;
+ for (Result result : scanner) {
+ System.out.println("Partition " + String.format("%02d", partition) +
+ " Family1 Forward Scan result: " + Arrays.toString(result.raw()));
+ count1++;
+ }
+ scanner.close();
+
+ s = new Scan(scan.getFirst(), scan.getSecond());
+ s.addFamily(family2.getBytes());
+ scanner = hTable.getScanner(s);
+ int count2 = 0;
+ for (Result result : scanner) {
+ System.out.println("Partition " + String.format("%02d", partition) +
+ " Family2 Forward Scan result: " + Arrays.toString(result.raw()));
+ count2++;
+ }
+ scanner.close();
+
+ Assert.assertEquals("Partition " + partition + " should have data in family1", count1, count2);
+ }
+
+ for (int partition = 0; partition < Math.min(5, scans.size()); partition++) {
+ Pair scan = scans.get(partition);
+
+ Scan s = new Scan(scan.getSecond(), scan.getFirst());
+ s.addFamily(family1.getBytes());
+ s.setReversed(true);
+ ResultScanner scanner = hTable.getScanner(s);
+ for (Result result : scanner) {
+ System.out.println("Partition " + String.format("%02d", partition) +
+ " Family1 Reverse Scan result: " + Arrays.toString(result.raw()));
+ }
+ scanner.close();
+ }
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail("testSharding02 failed: " + e.getMessage());
+ } finally {
+ for (int i = 0; i < key.length; i++) {
+ Delete delete = new Delete(toBytes(key[i]));
+ delete.addFamily(family1.getBytes());
+ delete.addFamily(family2.getBytes());
+ hTable.delete(delete);
+ }
+ }
+
+ hTable.close();
+ }
+}
\ No newline at end of file
diff --git a/src/test/java/dds_unit_test_db.sql b/src/test/java/dds_unit_test_db.sql
new file mode 100644
index 00000000..0c418684
--- /dev/null
+++ b/src/test/java/dds_unit_test_db.sql
@@ -0,0 +1,786 @@
+create database if not exists group_00;
+create database if not exists group_01;
+create database if not exists group_02;
+create database if not exists group_03;
+create database if not exists group_04;
+create database if not exists group_05;
+create database if not exists group_06;
+create database if not exists group_07;
+create database if not exists group_08;
+create database if not exists group_09;
+
+create tablegroup test SHARDING = 'ADAPTIVE';
+
+create table if not exists group_00.test$family_00(
+ K varbinary(1024) NOT NULL,
+ Q varbinary(256) NOT NULL,
+ T bigint(20) NOT NULL,
+ V varbinary(1024) NOT NULL,
+ PRIMARY KEY (K, Q, T)
+) TABLEGROUP = 'test';
+create table if not exists group_00.test$family_01(
+ K varbinary(1024) NOT NULL,
+ Q varbinary(256) NOT NULL,
+ T bigint(20) NOT NULL,
+ V varbinary(1024) NOT NULL,
+ PRIMARY KEY (K, Q, T)
+) TABLEGROUP = 'test';
+
+create table if not exists group_00.test$family_02(
+ K varbinary(1024) NOT NULL,
+ Q varbinary(256) NOT NULL,
+ T bigint(20) NOT NULL,
+ V varbinary(1024) NOT NULL,
+ PRIMARY KEY (K, Q, T)
+) TABLEGROUP = 'test';
+
+create table if not exists group_00.test$family_03(
+ K varbinary(1024) NOT NULL,
+ Q varbinary(256) NOT NULL,
+ T bigint(20) NOT NULL,
+ V varbinary(1024) NOT NULL,
+ PRIMARY KEY (K, Q, T)
+) TABLEGROUP = 'test';
+
+create table if not exists group_00.test$family_04(
+ K varbinary(1024) NOT NULL,
+ Q varbinary(256) NOT NULL,
+ T bigint(20) NOT NULL,
+ V varbinary(1024) NOT NULL,
+ PRIMARY KEY (K, Q, T)
+) TABLEGROUP = 'test';
+create table if not exists group_00.test$family_05(
+ K varbinary(1024) NOT NULL,
+ Q varbinary(256) NOT NULL,
+ T bigint(20) NOT NULL,
+ V varbinary(1024) NOT NULL,
+ PRIMARY KEY (K, Q, T)
+) TABLEGROUP = 'test';
+
+create table if not exists group_00.test$family_06(
+ K varbinary(1024) NOT NULL,
+ Q varbinary(256) NOT NULL,
+ T bigint(20) NOT NULL,
+ V varbinary(1024) NOT NULL,
+ PRIMARY KEY (K, Q, T)
+) TABLEGROUP = 'test';
+
+create table if not exists group_00.test$family_07(
+ K varbinary(1024) NOT NULL,
+ Q varbinary(256) NOT NULL,
+ T bigint(20) NOT NULL,
+ V varbinary(1024) NOT NULL,
+ PRIMARY KEY (K, Q, T)
+) TABLEGROUP = 'test';
+
+create table if not exists group_00.test$family_08(
+ K varbinary(1024) NOT NULL,
+ Q varbinary(256) NOT NULL,
+ T bigint(20) NOT NULL,
+ V varbinary(1024) NOT NULL,
+ PRIMARY KEY (K, Q, T)
+) TABLEGROUP = 'test';
+create table if not exists group_00.test$family_09(
+ K varbinary(1024) NOT NULL,
+ Q varbinary(256) NOT NULL,
+ T bigint(20) NOT NULL,
+ V varbinary(1024) NOT NULL,
+ PRIMARY KEY (K, Q, T)
+) TABLEGROUP = 'test';
+
+
+
+create table if not exists group_01.test$family_00(
+ K varbinary(1024) NOT NULL,
+ Q varbinary(256) NOT NULL,
+ T bigint(20) NOT NULL,
+ V varbinary(1024) NOT NULL,
+ PRIMARY KEY (K, Q, T)
+) TABLEGROUP = 'test';
+create table if not exists group_01.test$family_01(
+ K varbinary(1024) NOT NULL,
+ Q varbinary(256) NOT NULL,
+ T bigint(20) NOT NULL,
+ V varbinary(1024) NOT NULL,
+ PRIMARY KEY (K, Q, T)
+) TABLEGROUP = 'test';
+
+create table if not exists group_01.test$family_02(
+ K varbinary(1024) NOT NULL,
+ Q varbinary(256) NOT NULL,
+ T bigint(20) NOT NULL,
+ V varbinary(1024) NOT NULL,
+ PRIMARY KEY (K, Q, T)
+) TABLEGROUP = 'test';
+
+create table if not exists group_01.test$family_03(
+ K varbinary(1024) NOT NULL,
+ Q varbinary(256) NOT NULL,
+ T bigint(20) NOT NULL,
+ V varbinary(1024) NOT NULL,
+ PRIMARY KEY (K, Q, T)
+) TABLEGROUP = 'test';
+
+create table if not exists group_01.test$family_04(
+ K varbinary(1024) NOT NULL,
+ Q varbinary(256) NOT NULL,
+ T bigint(20) NOT NULL,
+ V varbinary(1024) NOT NULL,
+ PRIMARY KEY (K, Q, T)
+) TABLEGROUP = 'test';
+create table if not exists group_01.test$family_05(
+ K varbinary(1024) NOT NULL,
+ Q varbinary(256) NOT NULL,
+ T bigint(20) NOT NULL,
+ V varbinary(1024) NOT NULL,
+ PRIMARY KEY (K, Q, T)
+) TABLEGROUP = 'test';
+
+create table if not exists group_01.test$family_06(
+ K varbinary(1024) NOT NULL,
+ Q varbinary(256) NOT NULL,
+ T bigint(20) NOT NULL,
+ V varbinary(1024) NOT NULL,
+ PRIMARY KEY (K, Q, T)
+) TABLEGROUP = 'test';
+
+create table if not exists group_01.test$family_07(
+ K varbinary(1024) NOT NULL,
+ Q varbinary(256) NOT NULL,
+ T bigint(20) NOT NULL,
+ V varbinary(1024) NOT NULL,
+ PRIMARY KEY (K, Q, T)
+) TABLEGROUP = 'test';
+
+create table if not exists group_01.test$family_08(
+ K varbinary(1024) NOT NULL,
+ Q varbinary(256) NOT NULL,
+ T bigint(20) NOT NULL,
+ V varbinary(1024) NOT NULL,
+ PRIMARY KEY (K, Q, T)
+) TABLEGROUP = 'test';
+create table if not exists group_01.test$family_09(
+ K varbinary(1024) NOT NULL,
+ Q varbinary(256) NOT NULL,
+ T bigint(20) NOT NULL,
+ V varbinary(1024) NOT NULL,
+ PRIMARY KEY (K, Q, T)
+) TABLEGROUP = 'test';
+
+create table if not exists group_02.test$family_00(
+ K varbinary(1024) NOT NULL,
+ Q varbinary(256) NOT NULL,
+ T bigint(20) NOT NULL,
+ V varbinary(1024) NOT NULL,
+ PRIMARY KEY (K, Q, T)
+) TABLEGROUP = 'test';
+
+create table if not exists group_02.test$family_01(
+ K varbinary(1024) NOT NULL,
+ Q varbinary(256) NOT NULL,
+ T bigint(20) NOT NULL,
+ V varbinary(1024) NOT NULL,
+ PRIMARY KEY (K, Q, T)
+) TABLEGROUP = 'test';
+
+create table if not exists group_02.test$family_02(
+ K varbinary(1024) NOT NULL,
+ Q varbinary(256) NOT NULL,
+ T bigint(20) NOT NULL,
+ V varbinary(1024) NOT NULL,
+ PRIMARY KEY (K, Q, T)
+) TABLEGROUP = 'test';
+
+create table if not exists group_02.test$family_03(
+ K varbinary(1024) NOT NULL,
+ Q varbinary(256) NOT NULL,
+ T bigint(20) NOT NULL,
+ V varbinary(1024) NOT NULL,
+ PRIMARY KEY (K, Q, T)
+) TABLEGROUP = 'test';
+
+create table if not exists group_02.test$family_04(
+ K varbinary(1024) NOT NULL,
+ Q varbinary(256) NOT NULL,
+ T bigint(20) NOT NULL,
+ V varbinary(1024) NOT NULL,
+ PRIMARY KEY (K, Q, T)
+) TABLEGROUP = 'test';
+
+create table if not exists group_02.test$family_05(
+ K varbinary(1024) NOT NULL,
+ Q varbinary(256) NOT NULL,
+ T bigint(20) NOT NULL,
+ V varbinary(1024) NOT NULL,
+ PRIMARY KEY (K, Q, T)
+) TABLEGROUP = 'test';
+
+create table if not exists group_02.test$family_06(
+ K varbinary(1024) NOT NULL,
+ Q varbinary(256) NOT NULL,
+ T bigint(20) NOT NULL,
+ V varbinary(1024) NOT NULL,
+ PRIMARY KEY (K, Q, T)
+) TABLEGROUP = 'test';
+
+create table if not exists group_02.test$family_07(
+ K varbinary(1024) NOT NULL,
+ Q varbinary(256) NOT NULL,
+ T bigint(20) NOT NULL,
+ V varbinary(1024) NOT NULL,
+ PRIMARY KEY (K, Q, T)
+) TABLEGROUP = 'test';
+
+create table if not exists group_02.test$family_08(
+ K varbinary(1024) NOT NULL,
+ Q varbinary(256) NOT NULL,
+ T bigint(20) NOT NULL,
+ V varbinary(1024) NOT NULL,
+ PRIMARY KEY (K, Q, T)
+) TABLEGROUP = 'test';
+
+create table if not exists group_02.test$family_09(
+ K varbinary(1024) NOT NULL,
+ Q varbinary(256) NOT NULL,
+ T bigint(20) NOT NULL,
+ V varbinary(1024) NOT NULL,
+ PRIMARY KEY (K, Q, T)
+) TABLEGROUP = 'test';
+
+create table if not exists group_03.test$family_00(
+ K varbinary(1024) NOT NULL,
+ Q varbinary(256) NOT NULL,
+ T bigint(20) NOT NULL,
+ V varbinary(1024) NOT NULL,
+ PRIMARY KEY (K, Q, T)
+) TABLEGROUP = 'test';
+create table if not exists group_03.test$family_01(
+ K varbinary(1024) NOT NULL,
+ Q varbinary(256) NOT NULL,
+ T bigint(20) NOT NULL,
+ V varbinary(1024) NOT NULL,
+ PRIMARY KEY (K, Q, T)
+) TABLEGROUP = 'test';
+create table if not exists group_03.test$family_02(
+ K varbinary(1024) NOT NULL,
+ Q varbinary(256) NOT NULL,
+ T bigint(20) NOT NULL,
+ V varbinary(1024) NOT NULL,
+ PRIMARY KEY (K, Q, T)
+) TABLEGROUP = 'test';
+create table if not exists group_03.test$family_03(
+ K varbinary(1024) NOT NULL,
+ Q varbinary(256) NOT NULL,
+ T bigint(20) NOT NULL,
+ V varbinary(1024) NOT NULL,
+ PRIMARY KEY (K, Q, T)
+) TABLEGROUP = 'test';
+create table if not exists group_03.test$family_04(
+ K varbinary(1024) NOT NULL,
+ Q varbinary(256) NOT NULL,
+ T bigint(20) NOT NULL,
+ V varbinary(1024) NOT NULL,
+ PRIMARY KEY (K, Q, T)
+) TABLEGROUP = 'test';
+
+create table if not exists group_03.test$family_05(
+ K varbinary(1024) NOT NULL,
+ Q varbinary(256) NOT NULL,
+ T bigint(20) NOT NULL,
+ V varbinary(1024) NOT NULL,
+ PRIMARY KEY (K, Q, T)
+) TABLEGROUP = 'test';
+
+create table if not exists group_03.test$family_06(
+ K varbinary(1024) NOT NULL,
+ Q varbinary(256) NOT NULL,
+ T bigint(20) NOT NULL,
+ V varbinary(1024) NOT NULL,
+ PRIMARY KEY (K, Q, T)
+) TABLEGROUP = 'test';
+
+create table if not exists group_03.test$family_07(
+ K varbinary(1024) NOT NULL,
+ Q varbinary(256) NOT NULL,
+ T bigint(20) NOT NULL,
+ V varbinary(1024) NOT NULL,
+ PRIMARY KEY (K, Q, T)
+) TABLEGROUP = 'test';
+
+create table if not exists group_03.test$family_08(
+ K varbinary(1024) NOT NULL,
+ Q varbinary(256) NOT NULL,
+ T bigint(20) NOT NULL,
+ V varbinary(1024) NOT NULL,
+ PRIMARY KEY (K, Q, T)
+) TABLEGROUP = 'test';
+
+create table if not exists group_03.test$family_09(
+ K varbinary(1024) NOT NULL,
+ Q varbinary(256) NOT NULL,
+ T bigint(20) NOT NULL,
+ V varbinary(1024) NOT NULL,
+ PRIMARY KEY (K, Q, T)
+) TABLEGROUP = 'test';
+
+create table if not exists group_04.test$family_00(
+ K varbinary(1024) NOT NULL,
+ Q varbinary(256) NOT NULL,
+ T bigint(20) NOT NULL,
+ V varbinary(1024) NOT NULL,
+ PRIMARY KEY (K, Q, T)
+) TABLEGROUP = 'test';
+create table if not exists group_04.test$family_01(
+ K varbinary(1024) NOT NULL,
+ Q varbinary(256) NOT NULL,
+ T bigint(20) NOT NULL,
+ V varbinary(1024) NOT NULL,
+ PRIMARY KEY (K, Q, T)
+) TABLEGROUP = 'test';
+
+create table if not exists group_04.test$family_02(
+ K varbinary(1024) NOT NULL,
+ Q varbinary(256) NOT NULL,
+ T bigint(20) NOT NULL,
+ V varbinary(1024) NOT NULL,
+ PRIMARY KEY (K, Q, T)
+) TABLEGROUP = 'test';
+
+create table if not exists group_04.test$family_03(
+ K varbinary(1024) NOT NULL,
+ Q varbinary(256) NOT NULL,
+ T bigint(20) NOT NULL,
+ V varbinary(1024) NOT NULL,
+ PRIMARY KEY (K, Q, T)
+) TABLEGROUP = 'test';
+
+create table if not exists group_04.test$family_04(
+ K varbinary(1024) NOT NULL,
+ Q varbinary(256) NOT NULL,
+ T bigint(20) NOT NULL,
+ V varbinary(1024) NOT NULL,
+ PRIMARY KEY (K, Q, T)
+) TABLEGROUP = 'test';
+create table if not exists group_04.test$family_05(
+ K varbinary(1024) NOT NULL,
+ Q varbinary(256) NOT NULL,
+ T bigint(20) NOT NULL,
+ V varbinary(1024) NOT NULL,
+ PRIMARY KEY (K, Q, T)
+) TABLEGROUP = 'test';
+
+create table if not exists group_04.test$family_06(
+ K varbinary(1024) NOT NULL,
+ Q varbinary(256) NOT NULL,
+ T bigint(20) NOT NULL,
+ V varbinary(1024) NOT NULL,
+ PRIMARY KEY (K, Q, T)
+) TABLEGROUP = 'test';
+
+create table if not exists group_04.test$family_07(
+ K varbinary(1024) NOT NULL,
+ Q varbinary(256) NOT NULL,
+ T bigint(20) NOT NULL,
+ V varbinary(1024) NOT NULL,
+ PRIMARY KEY (K, Q, T)
+) TABLEGROUP = 'test';
+
+create table if not exists group_04.test$family_08(
+ K varbinary(1024) NOT NULL,
+ Q varbinary(256) NOT NULL,
+ T bigint(20) NOT NULL,
+ V varbinary(1024) NOT NULL,
+ PRIMARY KEY (K, Q, T)
+) TABLEGROUP = 'test';
+create table if not exists group_04.test$family_09(
+ K varbinary(1024) NOT NULL,
+ Q varbinary(256) NOT NULL,
+ T bigint(20) NOT NULL,
+ V varbinary(1024) NOT NULL,
+ PRIMARY KEY (K, Q, T)
+) TABLEGROUP = 'test';
+
+create table if not exists group_05.test$family_00(
+ K varbinary(1024) NOT NULL,
+ Q varbinary(256) NOT NULL,
+ T bigint(20) NOT NULL,
+ V varbinary(1024) NOT NULL,
+ PRIMARY KEY (K, Q, T)
+) TABLEGROUP = 'test';
+create table if not exists group_05.test$family_01(
+ K varbinary(1024) NOT NULL,
+ Q varbinary(256) NOT NULL,
+ T bigint(20) NOT NULL,
+ V varbinary(1024) NOT NULL,
+ PRIMARY KEY (K, Q, T)
+) TABLEGROUP = 'test';
+
+create table if not exists group_05.test$family_02(
+ K varbinary(1024) NOT NULL,
+ Q varbinary(256) NOT NULL,
+ T bigint(20) NOT NULL,
+ V varbinary(1024) NOT NULL,
+ PRIMARY KEY (K, Q, T)
+) TABLEGROUP = 'test';
+
+create table if not exists group_05.test$family_03(
+ K varbinary(1024) NOT NULL,
+ Q varbinary(256) NOT NULL,
+ T bigint(20) NOT NULL,
+ V varbinary(1024) NOT NULL,
+ PRIMARY KEY (K, Q, T)
+) TABLEGROUP = 'test';
+
+create table if not exists group_05.test$family_04(
+ K varbinary(1024) NOT NULL,
+ Q varbinary(256) NOT NULL,
+ T bigint(20) NOT NULL,
+ V varbinary(1024) NOT NULL,
+ PRIMARY KEY (K, Q, T)
+) TABLEGROUP = 'test';
+create table if not exists group_05.test$family_05(
+ K varbinary(1024) NOT NULL,
+ Q varbinary(256) NOT NULL,
+ T bigint(20) NOT NULL,
+ V varbinary(1024) NOT NULL,
+ PRIMARY KEY (K, Q, T)
+) TABLEGROUP = 'test';
+
+create table if not exists group_05.test$family_06(
+ K varbinary(1024) NOT NULL,
+ Q varbinary(256) NOT NULL,
+ T bigint(20) NOT NULL,
+ V varbinary(1024) NOT NULL,
+ PRIMARY KEY (K, Q, T)
+) TABLEGROUP = 'test';
+
+create table if not exists group_05.test$family_07(
+ K varbinary(1024) NOT NULL,
+ Q varbinary(256) NOT NULL,
+ T bigint(20) NOT NULL,
+ V varbinary(1024) NOT NULL,
+ PRIMARY KEY (K, Q, T)
+) TABLEGROUP = 'test';
+
+create table if not exists group_05.test$family_08(
+ K varbinary(1024) NOT NULL,
+ Q varbinary(256) NOT NULL,
+ T bigint(20) NOT NULL,
+ V varbinary(1024) NOT NULL,
+ PRIMARY KEY (K, Q, T)
+) TABLEGROUP = 'test';
+create table if not exists group_05.test$family_09(
+ K varbinary(1024) NOT NULL,
+ Q varbinary(256) NOT NULL,
+ T bigint(20) NOT NULL,
+ V varbinary(1024) NOT NULL,
+ PRIMARY KEY (K, Q, T)
+) TABLEGROUP = 'test';
+
+create table if not exists group_06.test$family_00(
+ K varbinary(1024) NOT NULL,
+ Q varbinary(256) NOT NULL,
+ T bigint(20) NOT NULL,
+ V varbinary(1024) NOT NULL,
+ PRIMARY KEY (K, Q, T)
+) TABLEGROUP = 'test';
+create table if not exists group_06.test$family_01(
+ K varbinary(1024) NOT NULL,
+ Q varbinary(256) NOT NULL,
+ T bigint(20) NOT NULL,
+ V varbinary(1024) NOT NULL,
+ PRIMARY KEY (K, Q, T)
+) TABLEGROUP = 'test';
+
+create table if not exists group_06.test$family_02(
+ K varbinary(1024) NOT NULL,
+ Q varbinary(256) NOT NULL,
+ T bigint(20) NOT NULL,
+ V varbinary(1024) NOT NULL,
+ PRIMARY KEY (K, Q, T)
+) TABLEGROUP = 'test';
+
+create table if not exists group_06.test$family_03(
+ K varbinary(1024) NOT NULL,
+ Q varbinary(256) NOT NULL,
+ T bigint(20) NOT NULL,
+ V varbinary(1024) NOT NULL,
+ PRIMARY KEY (K, Q, T)
+) TABLEGROUP = 'test';
+
+create table if not exists group_06.test$family_04(
+ K varbinary(1024) NOT NULL,
+ Q varbinary(256) NOT NULL,
+ T bigint(20) NOT NULL,
+ V varbinary(1024) NOT NULL,
+ PRIMARY KEY (K, Q, T)
+) TABLEGROUP = 'test';
+create table if not exists group_06.test$family_05(
+ K varbinary(1024) NOT NULL,
+ Q varbinary(256) NOT NULL,
+ T bigint(20) NOT NULL,
+ V varbinary(1024) NOT NULL,
+ PRIMARY KEY (K, Q, T)
+) TABLEGROUP = 'test';
+
+create table if not exists group_06.test$family_06(
+ K varbinary(1024) NOT NULL,
+ Q varbinary(256) NOT NULL,
+ T bigint(20) NOT NULL,
+ V varbinary(1024) NOT NULL,
+ PRIMARY KEY (K, Q, T)
+) TABLEGROUP = 'test';
+
+create table if not exists group_06.test$family_07(
+ K varbinary(1024) NOT NULL,
+ Q varbinary(256) NOT NULL,
+ T bigint(20) NOT NULL,
+ V varbinary(1024) NOT NULL,
+ PRIMARY KEY (K, Q, T)
+) TABLEGROUP = 'test';
+
+create table if not exists group_06.test$family_08(
+ K varbinary(1024) NOT NULL,
+ Q varbinary(256) NOT NULL,
+ T bigint(20) NOT NULL,
+ V varbinary(1024) NOT NULL,
+ PRIMARY KEY (K, Q, T)
+) TABLEGROUP = 'test';
+create table if not exists group_06.test$family_09(
+ K varbinary(1024) NOT NULL,
+ Q varbinary(256) NOT NULL,
+ T bigint(20) NOT NULL,
+ V varbinary(1024) NOT NULL,
+ PRIMARY KEY (K, Q, T)
+) TABLEGROUP = 'test';
+
+create table if not exists group_07.test$family_00(
+ K varbinary(1024) NOT NULL,
+ Q varbinary(256) NOT NULL,
+ T bigint(20) NOT NULL,
+ V varbinary(1024) NOT NULL,
+ PRIMARY KEY (K, Q, T)
+) TABLEGROUP = 'test';
+create table if not exists group_07.test$family_01(
+ K varbinary(1024) NOT NULL,
+ Q varbinary(256) NOT NULL,
+ T bigint(20) NOT NULL,
+ V varbinary(1024) NOT NULL,
+ PRIMARY KEY (K, Q, T)
+) TABLEGROUP = 'test';
+
+create table if not exists group_07.test$family_02(
+ K varbinary(1024) NOT NULL,
+ Q varbinary(256) NOT NULL,
+ T bigint(20) NOT NULL,
+ V varbinary(1024) NOT NULL,
+ PRIMARY KEY (K, Q, T)
+) TABLEGROUP = 'test';
+
+create table if not exists group_07.test$family_03(
+ K varbinary(1024) NOT NULL,
+ Q varbinary(256) NOT NULL,
+ T bigint(20) NOT NULL,
+ V varbinary(1024) NOT NULL,
+ PRIMARY KEY (K, Q, T)
+) TABLEGROUP = 'test';
+
+create table if not exists group_07.test$family_04(
+ K varbinary(1024) NOT NULL,
+ Q varbinary(256) NOT NULL,
+ T bigint(20) NOT NULL,
+ V varbinary(1024) NOT NULL,
+ PRIMARY KEY (K, Q, T)
+) TABLEGROUP = 'test';
+create table if not exists group_07.test$family_05(
+ K varbinary(1024) NOT NULL,
+ Q varbinary(256) NOT NULL,
+ T bigint(20) NOT NULL,
+ V varbinary(1024) NOT NULL,
+ PRIMARY KEY (K, Q, T)
+) TABLEGROUP = 'test';
+
+create table if not exists group_07.test$family_06(
+ K varbinary(1024) NOT NULL,
+ Q varbinary(256) NOT NULL,
+ T bigint(20) NOT NULL,
+ V varbinary(1024) NOT NULL,
+ PRIMARY KEY (K, Q, T)
+) TABLEGROUP = 'test';
+
+create table if not exists group_07.test$family_07(
+ K varbinary(1024) NOT NULL,
+ Q varbinary(256) NOT NULL,
+ T bigint(20) NOT NULL,
+ V varbinary(1024) NOT NULL,
+ PRIMARY KEY (K, Q, T)
+) TABLEGROUP = 'test';
+
+create table if not exists group_07.test$family_08(
+ K varbinary(1024) NOT NULL,
+ Q varbinary(256) NOT NULL,
+ T bigint(20) NOT NULL,
+ V varbinary(1024) NOT NULL,
+ PRIMARY KEY (K, Q, T)
+) TABLEGROUP = 'test';
+create table if not exists group_07.test$family_09(
+ K varbinary(1024) NOT NULL,
+ Q varbinary(256) NOT NULL,
+ T bigint(20) NOT NULL,
+ V varbinary(1024) NOT NULL,
+ PRIMARY KEY (K, Q, T)
+) TABLEGROUP = 'test';
+
+create table if not exists group_08.test$family_00(
+ K varbinary(1024) NOT NULL,
+ Q varbinary(256) NOT NULL,
+ T bigint(20) NOT NULL,
+ V varbinary(1024) NOT NULL,
+ PRIMARY KEY (K, Q, T)
+) TABLEGROUP = 'test';
+create table if not exists group_08.test$family_01(
+ K varbinary(1024) NOT NULL,
+ Q varbinary(256) NOT NULL,
+ T bigint(20) NOT NULL,
+ V varbinary(1024) NOT NULL,
+ PRIMARY KEY (K, Q, T)
+) TABLEGROUP = 'test';
+
+create table if not exists group_08.test$family_02(
+ K varbinary(1024) NOT NULL,
+ Q varbinary(256) NOT NULL,
+ T bigint(20) NOT NULL,
+ V varbinary(1024) NOT NULL,
+ PRIMARY KEY (K, Q, T)
+) TABLEGROUP = 'test';
+
+create table if not exists group_08.test$family_03(
+ K varbinary(1024) NOT NULL,
+ Q varbinary(256) NOT NULL,
+ T bigint(20) NOT NULL,
+ V varbinary(1024) NOT NULL,
+ PRIMARY KEY (K, Q, T)
+) TABLEGROUP = 'test';
+
+create table if not exists group_08.test$family_04(
+ K varbinary(1024) NOT NULL,
+ Q varbinary(256) NOT NULL,
+ T bigint(20) NOT NULL,
+ V varbinary(1024) NOT NULL,
+ PRIMARY KEY (K, Q, T)
+) TABLEGROUP = 'test';
+create table if not exists group_08.test$family_05(
+ K varbinary(1024) NOT NULL,
+ Q varbinary(256) NOT NULL,
+ T bigint(20) NOT NULL,
+ V varbinary(1024) NOT NULL,
+ PRIMARY KEY (K, Q, T)
+) TABLEGROUP = 'test';
+
+create table if not exists group_08.test$family_06(
+ K varbinary(1024) NOT NULL,
+ Q varbinary(256) NOT NULL,
+ T bigint(20) NOT NULL,
+ V varbinary(1024) NOT NULL,
+ PRIMARY KEY (K, Q, T)
+) TABLEGROUP = 'test';
+
+create table if not exists group_08.test$family_07(
+ K varbinary(1024) NOT NULL,
+ Q varbinary(256) NOT NULL,
+ T bigint(20) NOT NULL,
+ V varbinary(1024) NOT NULL,
+ PRIMARY KEY (K, Q, T)
+) TABLEGROUP = 'test';
+
+create table if not exists group_08.test$family_08(
+ K varbinary(1024) NOT NULL,
+ Q varbinary(256) NOT NULL,
+ T bigint(20) NOT NULL,
+ V varbinary(1024) NOT NULL,
+ PRIMARY KEY (K, Q, T)
+) TABLEGROUP = 'test';
+create table if not exists group_08.test$family_09(
+ K varbinary(1024) NOT NULL,
+ Q varbinary(256) NOT NULL,
+ T bigint(20) NOT NULL,
+ V varbinary(1024) NOT NULL,
+ PRIMARY KEY (K, Q, T)
+) TABLEGROUP = 'test';
+
+create table if not exists group_09.test$family_00(
+ K varbinary(1024) NOT NULL,
+ Q varbinary(256) NOT NULL,
+ T bigint(20) NOT NULL,
+ V varbinary(1024) NOT NULL,
+ PRIMARY KEY (K, Q, T)
+) TABLEGROUP = 'test';
+create table if not exists group_09.test$family_01(
+ K varbinary(1024) NOT NULL,
+ Q varbinary(256) NOT NULL,
+ T bigint(20) NOT NULL,
+ V varbinary(1024) NOT NULL,
+ PRIMARY KEY (K, Q, T)
+) TABLEGROUP = 'test';
+
+create table if not exists group_09.test$family_02(
+ K varbinary(1024) NOT NULL,
+ Q varbinary(256) NOT NULL,
+ T bigint(20) NOT NULL,
+ V varbinary(1024) NOT NULL,
+ PRIMARY KEY (K, Q, T)
+) TABLEGROUP = 'test';
+
+create table if not exists group_09.test$family_03(
+ K varbinary(1024) NOT NULL,
+ Q varbinary(256) NOT NULL,
+ T bigint(20) NOT NULL,
+ V varbinary(1024) NOT NULL,
+ PRIMARY KEY (K, Q, T)
+) TABLEGROUP = 'test';
+
+create table if not exists group_09.test$family_04(
+ K varbinary(1024) NOT NULL,
+ Q varbinary(256) NOT NULL,
+ T bigint(20) NOT NULL,
+ V varbinary(1024) NOT NULL,
+ PRIMARY KEY (K, Q, T)
+) TABLEGROUP = 'test';
+create table if not exists group_09.test$family_05(
+ K varbinary(1024) NOT NULL,
+ Q varbinary(256) NOT NULL,
+ T bigint(20) NOT NULL,
+ V varbinary(1024) NOT NULL,
+ PRIMARY KEY (K, Q, T)
+) TABLEGROUP = 'test';
+
+create table if not exists group_09.test$family_06(
+ K varbinary(1024) NOT NULL,
+ Q varbinary(256) NOT NULL,
+ T bigint(20) NOT NULL,
+ V varbinary(1024) NOT NULL,
+ PRIMARY KEY (K, Q, T)
+) TABLEGROUP = 'test';
+
+create table if not exists group_09.test$family_07(
+ K varbinary(1024) NOT NULL,
+ Q varbinary(256) NOT NULL,
+ T bigint(20) NOT NULL,
+ V varbinary(1024) NOT NULL,
+ PRIMARY KEY (K, Q, T)
+) TABLEGROUP = 'test';
+
+create table if not exists group_09.test$family_08(
+ K varbinary(1024) NOT NULL,
+ Q varbinary(256) NOT NULL,
+ T bigint(20) NOT NULL,
+ V varbinary(1024) NOT NULL,
+ PRIMARY KEY (K, Q, T)
+) TABLEGROUP = 'test';
+create table if not exists group_09.test$family_09(
+ K varbinary(1024) NOT NULL,
+ Q varbinary(256) NOT NULL,
+ T bigint(20) NOT NULL,
+ V varbinary(1024) NOT NULL,
+ PRIMARY KEY (K, Q, T)
+) TABLEGROUP = 'test';
| |