diff --git a/pom.xml b/pom.xml index c583dcd2..7032308a 100644 --- a/pom.xml +++ b/pom.xml @@ -54,7 +54,7 @@ ${project.encoding} UTF-8 1.7.21 - 1.4.0 + 1.4.3.1-SNAPSHOT @@ -125,10 +125,70 @@ objenesis 3.0.1 + + com.google.code.findbugs + jsr305 + 3.0.2 + + + com.taobao.remoting + network.core + 1.2.5 + + + com.alibaba + fastjson + 1.2.68.noneautotype + + + com.google.code.gson + gson + 2.8.6 + + + com.google.errorprone + error_prone_annotations + 2.3.4 + + + com.alipay.sofa + bolt + 1.6.5 + + + commons-httpclient + commons-httpclient + 3.1 + + + com.alibaba.toolkit.common + toolkit-common-lang + 1.1.5 + + + commons-io + commons-io + 2.5 + + + io.grpc + grpc-netty-shaded + 1.34.1 + + + com.alipay.common + tracer + 1.0.39 + + + mysql + mysql-connector-java + 5.1.40 + org.apache.hadoop hadoop-common @@ -184,6 +244,12 @@ com.oceanbase obkv-table-client ${table.client.version} + + + mysql-connector-java + mysql + + org.apache.hbase @@ -272,6 +338,14 @@ Sonatype Snapshot Repository https://s01.oss.sonatype.org/content/repositories/snapshots/ + + + true + + ant-artifactory + Alipay Artifactory + http://mvn.test.alipay.net/artifactory/content/groups/public/ + diff --git a/src/main/java/com/alipay/oceanbase/hbase/OHTable.java b/src/main/java/com/alipay/oceanbase/hbase/OHTable.java index f0ae53e6..86681cdf 100644 --- a/src/main/java/com/alipay/oceanbase/hbase/OHTable.java +++ b/src/main/java/com/alipay/oceanbase/hbase/OHTable.java @@ -25,6 +25,9 @@ import com.alipay.oceanbase.hbase.util.*; import com.alipay.oceanbase.rpc.ObGlobal; import com.alipay.oceanbase.rpc.ObTableClient; +import com.alipay.oceanbase.hbase.util.ExecuteAbleManager; +import com.alipay.oceanbase.rpc.OperationExecuteAble; +import com.alipay.oceanbase.rpc.exception.ExceptionUtil; import com.alipay.oceanbase.rpc.exception.ObTableException; import com.alipay.oceanbase.rpc.location.model.partition.Partition; import com.alipay.oceanbase.rpc.exception.ObTableUnexpectedException; @@ -67,6 +70,7 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.util.*; import java.util.concurrent.*; @@ -92,6 +96,24 @@ public class OHTable implements HTableInterface { */ private final ObTableClient obTableClient; + + /** + * === the operation executable for DDS support === + */ + private OperationExecuteAble executeAble; + private List qualifierWithNum = new ArrayList<>(); + + private String familyString; + + private String currentTableFamily; + + private boolean hTableQueryOptimize = false; + private boolean localHTableQueryOptimize = false; + private boolean localGetByBatch = false; + /** + * === the operation executable for DDS support end === + */ + /** * the ohTable name in byte array */ @@ -206,14 +228,29 @@ public OHTable(Configuration configuration, String tableName) throws IOException long keepAliveTime = configuration.getLong(HBASE_HTABLE_THREAD_KEEP_ALIVE_TIME, DEFAULT_HBASE_HTABLE_THREAD_KEEP_ALIVE_TIME); this.executePool = createDefaultThreadPoolExecutor(1, maxThreads, keepAliveTime); - OHConnectionConfiguration ohConnectionConf = new OHConnectionConfiguration(configuration); - int numRetries = configuration.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, - HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER); - this.obTableClient = ObTableClientManager.getOrCreateObTableClient(setUserDefinedNamespace( - this.tableNameString, ohConnectionConf)); - this.obTableClient.setRpcExecuteTimeout(ohConnectionConf.getRpcTimeout()); - this.obTableClient.setRuntimeRetryTimes(numRetries); - setOperationTimeout(ohConnectionConf.getOperationTimeout()); + + // Check if DDS configuration is present + String ddsAppName = configuration.get(HBASE_OCEANBASE_DDS_APP_NAME); + String ddsAppDsName = configuration.get(HBASE_OCEANBASE_DDS_APP_DS_NAME); + String ddsVersion = configuration.get(HBASE_OCEANBASE_DDS_VERSION); + + if (isNotBlank(ddsAppName) && isNotBlank(ddsAppDsName) && isNotBlank(ddsVersion)) { + // Use DDS mode with ExecuteAbleManager + this.obTableClient = null; // Not used in DDS mode + this.executeAble = ExecuteAbleManager.getOrCreateExecuteAble(configuration); + } else { + OHConnectionConfiguration ohConnectionConf = new OHConnectionConfiguration( + configuration); + int numRetries = configuration.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, + HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER); + this.obTableClient = ObTableClientManager + .getOrCreateObTableClient(setUserDefinedNamespace(this.tableNameString, + ohConnectionConf)); + this.obTableClient.setRpcExecuteTimeout(ohConnectionConf.getRpcTimeout()); + this.obTableClient.setRuntimeRetryTimes(numRetries); + setOperationTimeout(ohConnectionConf.getOperationTimeout()); + this.executeAble = null; + } finishSetUp(); } @@ -259,15 +296,28 @@ public OHTable(Configuration configuration, final byte[] tableName, this.tableNameString = Bytes.toString(tableName); this.executePool = executePool; this.cleanupPoolOnClose = false; - OHConnectionConfiguration ohConnectionConf = new OHConnectionConfiguration(configuration); - int numRetries = configuration.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, - HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER); - this.obTableClient = ObTableClientManager.getOrCreateObTableClient(setUserDefinedNamespace( - this.tableNameString, ohConnectionConf)); - this.obTableClient.setRpcExecuteTimeout(ohConnectionConf.getRpcTimeout()); - this.obTableClient.setRuntimeRetryTimes(numRetries); - setOperationTimeout(ohConnectionConf.getOperationTimeout()); + // Check if DDS configuration is present + if (configuration == null) { + throw new IllegalArgumentException("configuration is null."); + } + String ddsAppName = configuration.get(HBASE_OCEANBASE_DDS_APP_NAME); + String ddsAppDsName = configuration.get(HBASE_OCEANBASE_DDS_APP_DS_NAME); + String ddsVersion = configuration.get(HBASE_OCEANBASE_DDS_VERSION); + if (isNotBlank(ddsAppName) && isNotBlank(ddsAppDsName) && isNotBlank(ddsVersion)) { + this.executeAble = ExecuteAbleManager.getOrCreateExecuteAble(configuration); + this.obTableClient = null; // Not used in DDS mode + } else { + OHConnectionConfiguration ohConnectionConf = new OHConnectionConfiguration(configuration); + int numRetries = configuration.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, + HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER); + this.obTableClient = ObTableClientManager.getOrCreateObTableClient(setUserDefinedNamespace( + this.tableNameString, ohConnectionConf)); + this.obTableClient.setRpcExecuteTimeout(ohConnectionConf.getRpcTimeout()); + this.obTableClient.setRuntimeRetryTimes(numRetries); + setOperationTimeout(ohConnectionConf.getOperationTimeout()); + this.executeAble = null; + } finishSetUp(); } @@ -295,6 +345,7 @@ public OHTable(final byte[] tableName, final ObTableClient obTableClient, this.closeClientOnClose = false; this.executePool = executePool; this.obTableClient = obTableClient; + this.executeAble = null; this.configuration = new Configuration(); finishSetUp(); } @@ -338,7 +389,8 @@ public OHTable(TableName tableName, Connection connection, this.obTableClient.setRpcExecuteTimeout(rpcTimeout); this.obTableClient.setRuntimeRetryTimes(numRetries); setOperationTimeout(operationTimeout); - + this.executeAble = this.obTableClient; + finishSetUp(); } @@ -493,6 +545,52 @@ public Boolean[] exists(List gets) throws IOException { return objectResults; } + /** + * 将batch请求下所有请求的放在familyMap中。其中key为family,value是一个Pair类型:包含操作的索引值,以及操作内容 + * @param index 请求的索引值 + * @param innerFamilyMap 请求的一个pair容器 + * @param familyMap batch请求的容器 + */ + private void geneInnerFamilyMap(int index, Map> innerFamilyMap, + Map, List>> familyMap) { + + for (Map.Entry> entry : innerFamilyMap.entrySet()) { + String family = Bytes.toString(entry.getKey()); + Pair, List> keyValueWithIndex = familyMap.get(family); + if (keyValueWithIndex == null) { + keyValueWithIndex = new Pair, List>( + new ArrayList(), new ArrayList()); + familyMap.put(family, keyValueWithIndex); + } + keyValueWithIndex.getFirst().add(index); + keyValueWithIndex.getSecond().addAll(entry.getValue()); + } + } + /** + * 将batch请求下的get请求放在getFamilyMap中 + * @param index + * @param get + * @param getFamilyMap + */ + private void geneGetInnerFamilyMap(int index, Get get, Map, List>> getFamilyMap) { + int count = 0; + for (Map.Entry> entry : get.getFamilyMap().entrySet()) { + count++; + if (count > 1) { + throw new FeatureNotSupportedException("do not support multi family"); + } + String family = Bytes.toString(entry.getKey()); + Pair, List> getsWithIndex = getFamilyMap.get(family); + if (getsWithIndex == null) { + getsWithIndex = new Pair, List>(new ArrayList(), new ArrayList()); + getFamilyMap.put(family, getsWithIndex); + } + getsWithIndex.getFirst().add(index); + getsWithIndex.getSecond().add(get); + } + } + + private BatchOperation compatOldServerPut(final List actions, final Object[] results, BatchError batchError, int i, List puts) throws Exception { BatchOperationResult tmpResults; @@ -646,14 +744,19 @@ public void batch(final List actions, final Object[] results) thr } } BatchError batchError = new BatchError(); - obTableClient.setRuntimeBatchExecutor(executePool); + if (obTableClient != null) { + obTableClient.setRuntimeBatchExecutor(executePool); + } + OperationExecuteAble executeAble = obTableClient == null ? this.executeAble : obTableClient; List resultMapSingleOp = new LinkedList<>(); - if (!ObGlobal.isHBaseBatchSupport()) { + if (executeAble instanceof ObTableClient && !ObGlobal.isHBaseBatchSupport()) { try { compatOldServerBatch(actions, results, batchError); } catch (Exception e) { throw new IOException(e); } + } else if (obTableClient == null) { + batchForDDS(actions); } else { String realTableName = getTargetTableName(actions); BatchOperation batch = buildBatchOperation(realTableName, actions, @@ -701,6 +804,91 @@ public void batch(final List actions, final Object[] results) thr } } + /** + * Check if current table is in DDS mode + * @return true if in DDS mode (obTableClient is null and executeAble is not null) + */ + private boolean isDDSMode() { + return obTableClient == null && executeAble != null; + } + + /** + * 当batch请求中至少包含写入/删除/get中的2种类型的请求时,说明时混合的batch请求,返回true + * @param write 写入请求的数量 + * @param delete 删除请求的数量 + * @param get get请求的数量 + * @return + */ + private boolean isMix(int write, int delete, int get) { + int typeCount = (write > 0 ? 1 : 0) + (delete > 0 ? 1 : 0) + (get > 0 ? 1 : 0); + return typeCount >= 2; + } + + /** + * 批量的batch请求,目前支持put和delete以及get。 不支持复杂的get请求,即设置maxVersion、filter等限制,只可以获取最新的版本 + * @param actions + * @return + * @throws IOException + */ + public Object[] batchForDDS(List actions) throws IOException { + Map, List>> familyMap = new HashMap<>(); + Map, List>> getsFamilyMap = new HashMap<>(); + ArrayList writeBuffer = new ArrayList<>(); + ArrayList deleteBuffer = new ArrayList<>(); + ArrayList getBuffer = new ArrayList<>(); + int idx = -1; + for (Row action : actions) { + idx++; + if (action instanceof Put) { + Put aPut = (Put) action; + validatePut(aPut); + checkFamilyViolation(aPut.getFamilyMap().keySet()); + writeBuffer.add((Put) action); + currentWriteBufferSize += aPut.heapSize(); + geneInnerFamilyMap(idx, aPut.getFamilyMap(), familyMap); + } else if (action instanceof Delete) { + Delete aDelete = (Delete) action; + checkFamilyViolation(aDelete.getFamilyMap().keySet()); + deleteBuffer.add((Delete) action); + geneInnerFamilyMap(idx, aDelete.getFamilyMap(), familyMap); + } else if (action instanceof Get) { + Get aGet = (Get) action; + checkFamilyViolation(aGet.getFamilyMap().keySet()); + getBuffer.add((Get) action); + geneGetInnerFamilyMap(idx, aGet, getsFamilyMap); + } else { + throw new FeatureNotSupportedException("only support put, delete or get in a batch"); + } + } + + boolean[] resultSuccess = new boolean[actions.size()]; + Object[] getsResults = new Object[actions.size()]; + + // 先执行gets,再执行mutate,否则可能get的结果不一致 + for (Map.Entry, List>> entry : getsFamilyMap.entrySet()) { + Result[] tmpRes = get(entry.getValue().getSecond()); + int pos = 0; + for (Integer index : entry.getValue().getFirst()) { + resultSuccess[index] = true; + getsResults[index] = tmpRes[pos]; + pos++; + } + } + boolean isMix = isMix(writeBuffer.size(), deleteBuffer.size(), 0); + Object[] results = familyMapBatch(familyMap, resultSuccess, isMix); + + // merge result + for (int i = 0; i < getsResults.length; i++) { + if (getsResults[i] != null) { + if (results[i] != null) { + throw new IOException("unexpected batch result, table: " + tableNameString + ", result: " + results[i]); + } + results[i] = getsResults[i]; + } + } + return results; + } + private List generateGetResult(ObTableSingleOpResult getResult) throws IOException { List cells = new ArrayList<>(); ObTableSingleOpEntity singleOpEntity = getResult.getEntity(); @@ -875,6 +1063,9 @@ private void processColumnFilters(NavigableSet columnFilters, @Override public Result get(final Get get) throws IOException { + if (isDDSMode()) { + return getForDDS(get); + } if (get.getFamilyMap().keySet() == null || get.getFamilyMap().keySet().isEmpty()) { // check nothing, use table group; } else { @@ -949,7 +1140,13 @@ public Result call() throws IOException { @Override public Result[] get(List gets) throws IOException { Result[] results = new Result[gets.size()]; - if (ObGlobal.isHBaseBatchGetSupport()) { // get only supported in BatchSupport version + + // In DDS mode, avoid potential recursion by using individual get operations + if (isDDSMode()) { + for (int i = 0; i < gets.size(); i++) { + results[i] = get(gets.get(i)); + } + } else if (ObGlobal.isHBaseBatchGetSupport()) { // get only supported in BatchSupport version batch(gets, results); } else { List> futures = new LinkedList<>(); @@ -969,6 +1166,76 @@ public Result[] get(List gets) throws IOException { return results; } + public Result getForDDS(final Get get) throws IOException { + checkFamilyViolation(get.getFamilyMap().keySet()); + ServerCallable serverCallable = getRequestCallable(get); + return executeServerCallable(serverCallable); + } + + private ServerCallable getRequestCallable(Get get) { + return new ServerCallable(configuration, + executeAble, + tableNameString, + get.getRow(), + get.getRow(), + operationTimeout) { + public Result call() throws IOException { + List keyValueList = new ArrayList(); + for (Map.Entry> entry : get.getFamilyMap().entrySet()) { + byte[] family = entry.getKey(); + String targetTableName = getTargetTableName(tableNameString, Bytes.toString(family), configuration); + + try { + long t1 = System.currentTimeMillis(); + ObTableQuery obTableQuery; + ObHTableFilter filter = buildObHTableFilter(get.getFilter(), + get.getTimeRange(), get.getMaxVersions(), entry.getValue()); + + if (queryOptimized(entry.getValue())) { + obTableQuery = buildObTableQuery(filter, get.getRow(), get.getRow(), + get.getTimeRange(), -1, entry.getValue()); + } else { + obTableQuery = buildObTableQuery(filter, get.getRow(), true, + get.getRow(), true, -1); + } + + ObTableQueryRequest request = buildObTableQueryRequest(obTableQuery, targetTableName); + OperationExecuteAble executeAble = obTableClient != null ? this.obTableClient : this.executeAble; + ObTableClientQueryStreamResult clientQueryStreamResult = (ObTableClientQueryStreamResult) executeAble.execute(request); + byte[] newK = new byte[0]; + byte[] newQ = new byte[0]; + while (clientQueryStreamResult.next()) { + List row = clientQueryStreamResult.getRow(); + byte[] k = (byte[]) row.get(0).getValue(); + byte[] q = (byte[]) row.get(1).getValue(); + long t = (Long) row.get(2).getValue(); + byte[] v = (byte[]) row.get(3).getValue(); + if (k == null && q == null) { + k = newK; + q = newQ; + } else { + newK = k; + newQ = q; + } + keyValueList.add(new KeyValue(k, family, q, t, v)); + } + + TableHBaseLoggerFactory.MONITOR.info("get cost: {}ms, table: {}, optimize: {}, cells: {}", + (System.currentTimeMillis() - t1), + getTargetTableName(tableNameString, Bytes.toString(family), configuration), + queryOptimized(), keyValueList.size()); + } catch (Exception e) { + TableHBaseLoggerFactory.logger.error(LCD.convert("01-00002"), tableNameString, + Bytes.toString(family), e); + throw new IOException("query table:" + tableNameString + " family " + + Bytes.toString(family) + " error.", e); + } + } + return new Result(keyValueList); + } + }; + } + /** * hbase的获取这个rowkey,没有的话就前一个rowkey,不支持 * @param row row @@ -982,6 +1249,10 @@ public Result getRowOrBefore(byte[] row, byte[] family) { @Override public ResultScanner getScanner(final Scan scan) throws IOException { + if (obTableClient == null) { + return getScannerForDDS(scan); + } + if (scan.getFamilyMap().keySet().isEmpty()) { // check nothing, use table group; } else { @@ -1162,6 +1433,129 @@ public ResultScanner getScanner(byte[] family, byte[] qualifier) throws IOExcept return getScanner(scan); } + public boolean queryOptimized() { + return this.localHTableQueryOptimize || this.hTableQueryOptimize; + } + + public boolean queryOptimized(Set qualifiers) { + boolean ret = false; + if (queryOptimized() && qualifiers != null && !qualifiers.isEmpty()) { + for (byte[] aQualifier : qualifiers) { + if (aQualifier != null && aQualifier.length > 0) { + ret = true; + } else { + return false; + } + } + } + return ret; + } + + public ResultScanner getScannerForDDS(final Scan scan) throws IOException { + checkFamilyViolation(scan.getFamilyMap().keySet()); + //be careful about the packet size, if the packet exceed the max result size, leading to error + ServerCallable serverCallable = new ServerCallable( + configuration, executeAble, tableNameString, scan.getStartRow(), scan.getStopRow(), + operationTimeout) { + public ResultScanner call() throws Exception { + if (scan.getFamilyMap().keySet().isEmpty() || scan.getFamilyMap().size() > 1) { + throw new FeatureNotSupportedException("scan family map is not supported yet"); + } else { + for (Map.Entry> entry : scan.getFamilyMap().entrySet()) { + byte[] f = entry.getKey(); + try { + ObHTableFilter filter = buildObHTableFilter(scan.getFilter(), + scan.getTimeRange(), scan.getMaxVersions(), entry.getValue()); + ObTableQuery obTableQuery; + if (Arrays.equals(scan.getStartRow(), HConstants.EMPTY_START_ROW) + && Arrays.equals(scan.getStopRow(), HConstants.EMPTY_START_ROW)) { + obTableQuery = buildObTableQuery(filter, (ObNewRange)null, scan.getBatch()); + } else if (!scan.isReversed() && queryOptimized(entry.getValue())) { + obTableQuery = buildObTableQuery(filter, scan.getStartRow(), true, + scan.getStopRow(), false, scan.getBatch(), entry.getValue()); + } else if (scan.isReversed()) { + // 由于 HBase 接口与 OB 接口表达范围的差异, reverse scan 需要交换 startRow 和 stopRow + obTableQuery = buildObTableQuery(filter, scan.getStopRow(), false, + scan.getStartRow(), true, scan.getBatch()); + obTableQuery.setScanOrder(ObScanOrder.Reverse); + } else { + obTableQuery = buildObTableQuery(filter, scan.getStartRow(), true, + scan.getStopRow(), false, scan.getBatch()); + } + + obTableQuery.setMaxResultSize(scan.getMaxResultSize()); + long t1 = System.currentTimeMillis(); + ObTableQueryRequest request = buildObTableQueryRequest(obTableQuery, + getTargetTableName(tableNameString, Bytes.toString(f), configuration)); + ObTableClientQueryStreamResult clientQueryStreamResult = (ObTableClientQueryStreamResult) executeAble + .execute(request); + TableHBaseLoggerFactory.MONITOR.info("get cost query result: {}ms, table: {}, query optimize: {}", + (System.currentTimeMillis() - t1), + getTargetTableName(tableNameString, Bytes.toString(f), configuration), queryOptimized()); + return new ClientStreamScanner(clientQueryStreamResult, tableNameString, f, false); + } catch (Exception e) { + logger + .error(LCD.convert("01-00003"), tableNameString, Bytes.toString(f), e); + throw new IOException("scan table:" + tableNameString + " family " + + Bytes.toString(f) + " error.", e); + } + } + } + + throw new IOException("scan table:" + tableNameString + "has no family"); + } + }; + return executeServerCallable(serverCallable); + } + + private void checkFamilyViolation(Collection families) { + for (byte[] family : families) { + if (isBlank(Bytes.toString(family))) { + throw new IllegalArgumentException("family is blank"); + } + this.familyString = new String(family, StandardCharsets.UTF_8); + this.currentTableFamily = tableNameString.toLowerCase() + CH_SPLITE_TYPE + familyString.toLowerCase(); + } + + } + + private void innerDeleteForDDS(Delete delete) throws IOException { + checkArgument(delete.getRow() != null, "row is null"); + checkArgument(!delete.isEmpty(), "delete is empty"); + List errorCodeList = new ArrayList(); + try { + checkFamilyViolation(delete.getFamilyMap().keySet()); + + Map.Entry> entry = delete.getFamilyMap().entrySet().iterator() + .next(); + ObTableBatchOperation batch = buildObTableBatchOperation(entry.getValue(), false, null, + false); + + ObTableBatchOperationRequest request = buildObTableBatchOperationRequest(batch, + getTargetTableName(tableNameString, Bytes.toString(entry.getKey()), configuration)); + ObTableBatchOperationResult result = (ObTableBatchOperationResult) executeAble + .execute(request); + boolean hasError = false; + int throwErrorCode = 0; + for (ObTableOperationResult obTableOperationResult : result.getResults()) { + int errorCode = obTableOperationResult.getHeader().getErrno(); + errorCodeList.add(errorCode); + if (errorCode != 0) { + hasError = true; + throwErrorCode = errorCode; + } + } + + if (hasError) { + throw new ObTableException(throwErrorCode); + } + } catch (Exception e) { + logger.error(LCD.convert("01-00004"), tableNameString, errorCodeList, e); + throw new IOException("delete table " + tableNameString + " error codes " + + errorCodeList, e); + } + } + @Override public void put(Put put) throws IOException { doPut(Collections.singletonList(put)); @@ -1282,13 +1676,23 @@ private void innerDelete(Delete delete) throws IOException { @Override public void delete(Delete delete) throws IOException { checkFamilyViolation(delete.getFamilyMap().keySet(), false); - innerDelete(delete); + if (isDDSMode()) { + innerDeleteForDDS(delete); + } else { + innerDelete(delete); + } } @Override public void delete(List deletes) throws IOException { - for (Delete delete : deletes) { - innerDelete(delete); + if (isDDSMode()) { // DDS mode + for (Delete delete : deletes) { + innerDeleteForDDS(delete); + } + } else { + for (Delete delete : deletes) { + innerDelete(delete); + } } } @@ -1366,8 +1770,12 @@ private boolean checkAndMutation(byte[] row, byte[] family, byte[] qualifier, Co ObTableQueryAndMutateRequest request = buildObTableQueryAndMutateRequest(obTableQuery, batch, getTargetTableName(tableNameString, Bytes.toString(family), configuration)); - ObTableQueryAndMutateResult result = (ObTableQueryAndMutateResult) obTableClient - .execute(request); + ObTableQueryAndMutateResult result = null; + if (!isDDSMode()) { + result = (ObTableQueryAndMutateResult) obTableClient.execute(request); + } else { + result = (ObTableQueryAndMutateResult) executeAble.execute(request); + } return result.getAffectedRows() > 0; } @@ -1405,9 +1813,16 @@ public Result append(Append append) throws IOException { ObTableQueryAndMutateRequest request = buildObTableQueryAndMutateRequest(obTableQuery, batchOperation, getTargetTableName(tableNameString, Bytes.toString(f), configuration)); - request.setReturningAffectedEntity(true); - ObTableQueryAndMutateResult result = (ObTableQueryAndMutateResult) obTableClient - .execute(request); + request.setReturningAffectedEntity(append.isReturnResults()); + ObTableQueryAndMutateResult result = null; + if (obTableClient != null) { + result = (ObTableQueryAndMutateResult) obTableClient.execute(request); + } else { + result = (ObTableQueryAndMutateResult) executeAble.execute(request); + } + if (!append.isReturnResults()) { + return null; + } ObTableQueryResult queryResult = result.getAffectedEntity(); List keyValues = new ArrayList(); for (List row : queryResult.getPropertiesRows()) { @@ -1466,8 +1881,12 @@ public Result increment(Increment increment) throws IOException { ObTableQueryAndMutateRequest request = buildObTableQueryAndMutateRequest(obTableQuery, batch, getTargetTableName(tableNameString, Bytes.toString(f), configuration)); request.setReturningAffectedEntity(true); - ObTableQueryAndMutateResult result = (ObTableQueryAndMutateResult) obTableClient - .execute(request); + ObTableQueryAndMutateResult result = null; + if (obTableClient != null) { + result = (ObTableQueryAndMutateResult) obTableClient.execute(request); + } else { + result = (ObTableQueryAndMutateResult) executeAble.execute(request); + } ObTableQueryResult queryResult = result.getAffectedEntity(); List keyValues = new ArrayList(); for (List row : queryResult.getPropertiesRows()) { @@ -1516,8 +1935,12 @@ public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, lo ObTableQueryAndMutateRequest request = buildObTableQueryAndMutateRequest(obTableQuery, batch, getTargetTableName(tableNameString, Bytes.toString(family), configuration)); request.setReturningAffectedEntity(true); - ObTableQueryAndMutateResult result = (ObTableQueryAndMutateResult) obTableClient - .execute(request); + ObTableQueryAndMutateResult result = null; + if (obTableClient != null) { + result = (ObTableQueryAndMutateResult) obTableClient.execute(request); + } else { + result = (ObTableQueryAndMutateResult) executeAble.execute(request); + } ObTableQueryResult queryResult = result.getAffectedEntity(); if (queryResult.getPropertiesRows().size() != 1) { throw new IllegalStateException("the increment result size illegal " @@ -1550,7 +1973,10 @@ public boolean isAutoFlush() { @Override public void flushCommits() throws IOException { - + if (isDDSMode()) { + flushCommitsForDDS(); + return; + } try { if (writeBuffer.isEmpty()){ return; @@ -1611,6 +2037,45 @@ public void flushCommits() throws IOException { } } } + + private void flushCommitsForDDS() throws IOException { + try { + boolean[] resultSuccess = new boolean[writeBuffer.size()]; + try { + Map, List>> familyMap = new HashMap, List>>(); + for (int i = 0; i < writeBuffer.size(); i++) { + Put aPut = writeBuffer.get(i); + Map> innerFamilyMap = aPut.getFamilyMap(); + // multi family can not ensure automatic + geneInnerFamilyMap(i, innerFamilyMap, familyMap); + } + familyMapBatch(familyMap, resultSuccess, false); + + } finally { + // mutate list so that it is empty for complete success, or contains + // only failed records results are returned in the same order as the + // requests in list walk the list backwards, so we can remove from list + // without impacting the indexes of earlier members + for (int i = resultSuccess.length - 1; i >= 0; i--) { + if (resultSuccess[i]) { + // successful Puts are removed from the list here. + writeBuffer.remove(i); + } + } + } + } finally { + if (clearBufferOnFail) { + writeBuffer.clear(); + currentWriteBufferSize = 0; + } else { + // the write buffer was adjusted by processBatchOfPuts + currentWriteBufferSize = 0; + for (Put aPut : writeBuffer) { + currentWriteBufferSize += aPut.heapSize(); + } + } + } + } @Override public void close() throws IOException { @@ -1742,8 +2207,12 @@ public void batchCoprocessorService(Descriptors.MethodDescri @Override public void setOperationTimeout(int operationTimeout) { this.operationTimeout = operationTimeout; - this.obTableClient.setRuntimeMaxWait(operationTimeout); - this.obTableClient.setRuntimeBatchMaxWait(operationTimeout); + if (this.obTableClient != null) { + this.obTableClient.setRuntimeMaxWait(operationTimeout); + this.obTableClient.setRuntimeBatchMaxWait(operationTimeout); + } else { // TODO: dds set configuration + + } this.operationExecuteInPool = this.configuration.getBoolean( HBASE_CLIENT_OPERATION_EXECUTE_IN_POOL, (this.operationTimeout != HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT)); @@ -1768,7 +2237,75 @@ public int getRpcTimeout() { } public void setRuntimeBatchExecutor(ExecutorService runtimeBatchExecutor) { - this.obTableClient.setRuntimeBatchExecutor(runtimeBatchExecutor); + if (this.obTableClient != null) { + this.obTableClient.setRuntimeBatchExecutor(runtimeBatchExecutor); + } + } + + /** + * 封装batch请求。返回batch操作的结果集results + * @param familyMap 所有的batch请求的容器 + * @param resultSuccess batch请求的返回值数组 + * @param isMix 是否为混合请求的batch + * @throws IOException + */ + private Object[] familyMapBatch(Map, List>> familyMap, + boolean[] resultSuccess, boolean isMix) throws IOException { + Object[] batchResults = new Object[resultSuccess.length]; + + for (Map.Entry, List>> entry : familyMap.entrySet()) { + List errorCodeList = new ArrayList(entry.getValue().getSecond().size()); + ObTableBatchOperation batch; + try { + String targetTableName = getTargetTableName(this.tableNameString, entry.getKey(), configuration); + batch = buildObTableBatchOperation(entry.getValue().getSecond(), false, null, + isMix); + ObTableBatchOperationRequest request = buildObTableBatchOperationRequest(batch, targetTableName); + ObTableBatchOperationResult result = (ObTableBatchOperationResult) executeAble.execute(request); + List resultList = result.getResults(); + Map throwResults = new HashMap<>(); + + for (ObTableOperationResult opResult : resultList) { + int errorCode = opResult.getHeader().getErrno(); + errorCodeList.add(errorCode); + if (errorCode != 0) { + throwResults.put(entry.getValue().getFirst().get(0), opResult); + } + } + + for (Map.Entry throwResult : throwResults.entrySet()) { + if (throwResult.getValue() != null) { + ExceptionUtil.throwObTableException(throwResult.getValue().getExecuteHost(), + throwResult.getValue().getExecutePort(), throwResult.getValue().getSequence(), + throwResult.getValue().getUniqueId(), throwResult.getValue().getHeader().getErrno(), targetTableName); + } + } + + for (Integer index : entry.getValue().getFirst()) { + resultSuccess[index] = true; + List keyValues = new ArrayList<>(); + keyValues.add(new KeyValue()); + Result kvRes = new Result(keyValues); + batchResults[index] = kvRes; + } + } catch (Exception e) { + if (e instanceof ObTableException) { + ObTableException ee = (ObTableException)e; + errorCodeList.add(ee.getErrorCode()); + } + + logger.error(LCD.convert("01-00008"), tableNameString, errorCodeList, autoFlush, + resultSuccess.length, e); + + throw new IOException( + "batch mutate table " + tableNameString + ", error codes " + errorCodeList + + ", auto flush " + autoFlush + ", and current buffer size " + + resultSuccess.length + + ". batch opertaions failed. may caused by put and delete the same k/q at a batch or " + + "don't have any data", e); + } + } + return batchResults; } T executeServerCallable(final ServerCallable serverCallable) throws IOException { @@ -1858,6 +2395,118 @@ private ObHTableFilter buildObHTableFilter(Filter filter, TimeRange timeRange, i return obHTableFilter; } + private ObTableQuery buildObTableQuery(ObHTableFilter filter, ObNewRange obNewRange, + int batchSize) { + ObTableQuery obTableQuery = new ObTableQuery(); + obTableQuery.setIndexName("PRIMARY"); + obTableQuery.sethTableFilter(filter); + for (String column : ALL_COLUMNS) { + obTableQuery.addSelectColumn(column); + } + if (obNewRange != null) { + obTableQuery.addKeyRange(obNewRange); + } + if (batchSize > 0) { + obTableQuery.setBatchSize(batchSize); + } + return obTableQuery; + } + + private ObTableQuery buildObTableQuery(ObHTableFilter filter, + byte[] start, boolean includeStart, + byte[] stop, boolean includeStop, + int batchSize, Set qualifiers) { + List obRangeList = new ArrayList<>(); + for (byte[] qualifier : qualifiers) { + if (qualifier.length == 0) { + throw new ObTableUnexpectedException("qualifier is empty when use htable optimize query"); + } + + ObNewRange obNewRange = new ObNewRange(); + if (includeStart) { + obNewRange.setStartKey(ObRowKey.getInstance(start, qualifier, ObObj.getMin())); + } else { + obNewRange.setStartKey(ObRowKey.getInstance(start, qualifier, ObObj.getMax())); + } + + if (Arrays.equals(stop, HConstants.EMPTY_START_ROW)) { + obNewRange.setEndKey(ObRowKey.getInstance(ObObj.getMax(), qualifier, ObObj.getMax())); + } else if (includeStop) { + obNewRange.setEndKey(ObRowKey.getInstance(stop, qualifier, ObObj.getMax())); + } else { + obNewRange.setEndKey(ObRowKey.getInstance(stop, qualifier, ObObj.getMin())); + } + obRangeList.add(obNewRange); + } + + return buildObTableQuery(filter, obRangeList, batchSize); + } + + private ObTableQuery buildObTableQuery(ObHTableFilter filter, List obNewRangeList, + int batchSize) { + ObTableQuery obTableQuery = new ObTableQuery(); + obTableQuery.setIndexName("PRIMARY"); + obTableQuery.sethTableFilter(filter); + for (String column : ALL_COLUMNS) { + obTableQuery.addSelectColumn(column); + } + if (obNewRangeList.size() > 0) { + for (ObNewRange obNewRange : obNewRangeList) { + obTableQuery.addKeyRange(obNewRange); + } + } + if (batchSize > 0) { + obTableQuery.setBatchSize(batchSize); + } + return obTableQuery; + } + private ObTableQuery buildObTableQuery(ObHTableFilter filter, + byte[] start, byte[] stop, TimeRange tr, + int batchSize, Set qualifiers) { + List obRangeList = new ArrayList<>(); + for (byte[] qualifier : qualifiers) { + if (qualifier.length == 0) { + throw new ObTableUnexpectedException("qualifier is empty when use htable optimize query"); + } + + ObNewRange obNewRange = new ObNewRange(); + obNewRange.setStartKey(ObRowKey.getInstance(start, qualifier, -tr.getMax())); + if (Arrays.equals(stop, HConstants.EMPTY_START_ROW)) { + obNewRange.setEndKey(ObRowKey.getInstance(ObObj.getMax(), qualifier, -tr.getMin())); + } else { + obNewRange.setEndKey(ObRowKey.getInstance(stop, qualifier, -tr.getMin())); + } + obRangeList.add(obNewRange); + } + + return buildObTableQuery(filter, obRangeList, batchSize); + } + + private ObTableQuery buildObTableQuery(ObHTableFilter filter, byte[] start, + boolean includeStart, byte[] stop, boolean includeStop, + int batchSize) { + ObNewRange obNewRange = new ObNewRange(); + + if (includeStart) { + obNewRange.setStartKey(ObRowKey.getInstance(start, ObObj.getMin(), ObObj.getMin())); + } else { + obNewRange.setStartKey(ObRowKey.getInstance(start, ObObj.getMax(), ObObj.getMax())); + } + + if (Arrays.equals(stop, HConstants.EMPTY_START_ROW)) { + obNewRange.setEndKey(ObRowKey.getInstance(ObObj.getMax(), ObObj.getMax(), + ObObj.getMax())); + } else if (includeStop) { + obNewRange.setEndKey(ObRowKey.getInstance(stop, ObObj.getMax(), ObObj.getMax())); + } else { + obNewRange.setEndKey(ObRowKey.getInstance(stop, ObObj.getMin(), ObObj.getMin())); + } + + return buildObTableQuery(filter, obNewRange, batchSize); + } + + + private byte[] buildCheckAndMutateFilterString(byte[] family, byte[] qualifier, CompareFilter.CompareOp compareOp, byte[] value) throws IOException { @@ -2009,6 +2658,26 @@ public static ObTableBatchOperation buildObTableBatchOperation(List ke return batch; } + private ObTableBatchOperation buildObTableBatchOperation(List keyValueList, + boolean putToAppend, + List qualifiers, + boolean isMix) { + ObTableBatchOperation batch = new ObTableBatchOperation(); + for (KeyValue kv : keyValueList) { + if (qualifiers != null) { + qualifiers.add(kv.getQualifier()); + } + batch.addTableOperation(buildObTableOperation(kv, putToAppend)); + } + // batch with the same type + batch.setSameType(!isMix); + batch.setSamePropertiesNames(true); + return batch; + } + + + + private com.alipay.oceanbase.rpc.mutation.Mutation buildMutation(KeyValue kv, ObTableOperationType operationType, boolean isTableGroup) { diff --git a/src/main/java/com/alipay/oceanbase/hbase/OHTablePool.java b/src/main/java/com/alipay/oceanbase/hbase/OHTablePool.java index f0d4a447..f2c5e5ac 100644 --- a/src/main/java/com/alipay/oceanbase/hbase/OHTablePool.java +++ b/src/main/java/com/alipay/oceanbase/hbase/OHTablePool.java @@ -19,6 +19,7 @@ import com.alipay.oceanbase.hbase.constants.OHConstants; import com.alipay.oceanbase.hbase.exception.FeatureNotSupportedException; +import com.alipay.oceanbase.hbase.util.ExecuteAbleManager; import com.alipay.oceanbase.hbase.util.KeyDefiner; import com.alipay.oceanbase.hbase.util.OHTableFactory; import com.google.protobuf.Descriptors; @@ -39,12 +40,14 @@ import java.io.Closeable; import java.io.IOException; import java.util.Collection; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import static com.alipay.oceanbase.hbase.constants.OHConstants.*; +import static org.apache.commons.lang.StringUtils.isNotBlank; import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT; public class OHTablePool implements Closeable { @@ -156,6 +159,43 @@ public OHTablePool(final Configuration config, final int maxSize, this.tables = new PoolMap<>(this.poolType, this.maxSize); } + public void init() throws Exception { + if (tableAttributes != null) { + Map> tableAttributeMap = new HashMap>(); + for (Map.Entry entry : tableAttributes.entrySet()) { + String key = entry.getKey(); + byte[] value = entry.getValue(); + String[] parsedKey = KeyDefiner.parsePoolOHTableAttributeName(key); + if (parsedKey != null) { + String tableName = parsedKey[0]; + String attributeName = parsedKey[1]; + + Map attributeMap = tableAttributeMap.get(tableName); + + if (attributeMap == null) { + attributeMap = new HashMap(); + tableAttributeMap.put(tableName, attributeMap); + } + + attributeMap.put(attributeName, Bytes.toString(value)); + } + } + + for (Map.Entry> entry : tableAttributeMap.entrySet()) { + Map attributeMap = entry.getValue(); + String paramUrl = attributeMap.get(HBASE_OCEANBASE_PARAM_URL); + String fullUserName = attributeMap.get(HBASE_OCEANBASE_FULL_USER_NAME); + String password = attributeMap.get(HBASE_OCEANBASE_PASSWORD); + ExecuteAbleManager.getOrCreateObTableClient(config, paramUrl, fullUserName, + password); + } + } + + if (isNotBlank(config.get(HBASE_OCEANBASE_DDS_APP_NAME))) { + ExecuteAbleManager.getOrCreateDdsObTableClient(config); + } + } + /** * Get a reference to the specified table from the pool. * diff --git a/src/main/java/com/alipay/oceanbase/hbase/constants/OHConstants.java b/src/main/java/com/alipay/oceanbase/hbase/constants/OHConstants.java index e10ef0b1..4f5c2ea8 100644 --- a/src/main/java/com/alipay/oceanbase/hbase/constants/OHConstants.java +++ b/src/main/java/com/alipay/oceanbase/hbase/constants/OHConstants.java @@ -161,6 +161,20 @@ public final class OHConstants { public static final String SOCKET_TIMEOUT = "ipc.socket.timeout"; - public static final int DEFAULT_SOCKET_TIMEOUT = 20000; // 20 seconds +public static final int DEFAULT_SOCKET_TIMEOUT = 20000; // 20 seconds + + + /** + * DDS (Data Distribution Service) related constants + */ + public static final String HBASE_OCEANBASE_DDS_APP_NAME = "hbase.oceanbase.dds.app.name"; + public static final String HBASE_OCEANBASE_DDS_APP_DS_NAME = "hbase.oceanbase.dds.app.ds.name"; + public static final String HBASE_OCEANBASE_DDS_VERSION = "hbase.oceanbase.dds.version"; + public static final String HBASE_HTABLE_CLIENT_WRITE_BUFFER = "hbase.client.write.buffer"; + + public static final long DEFAULT_HBASE_HTABLE_CLIENT_WRITE_BUFFER = 2097152; + + public static final char CH_SINGLE_QUOTA = '\''; + public static final char CH_SPLITE_TYPE = '|'; } diff --git a/src/main/java/com/alipay/oceanbase/hbase/execute/ServerCallable.java b/src/main/java/com/alipay/oceanbase/hbase/execute/ServerCallable.java index 805503cf..4b3c125f 100644 --- a/src/main/java/com/alipay/oceanbase/hbase/execute/ServerCallable.java +++ b/src/main/java/com/alipay/oceanbase/hbase/execute/ServerCallable.java @@ -18,6 +18,7 @@ package com.alipay.oceanbase.hbase.execute; import com.alipay.oceanbase.rpc.ObTableClient; +import com.alipay.oceanbase.rpc.OperationExecuteAble; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.client.ConnectionUtils; @@ -34,6 +35,7 @@ public abstract class ServerCallable implements Callable { protected final Configuration conf; protected final ObTableClient obTableClient; + protected final OperationExecuteAble executeAble; protected final String tableNameString; protected int callTimeout; protected long globalStartTime, endTime; @@ -52,6 +54,27 @@ public ServerCallable(Configuration conf, ObTableClient obTableClient, String ta byte[] startRow, byte[] endRow, int callTimeout) { this.conf = conf; this.obTableClient = obTableClient; + this.executeAble = null; + this.tableNameString = tableNameString; + this.callTimeout = callTimeout; + this.startRow = startRow; + this.endRow = endRow; + } + + /** + * ServerCallable + * @param conf the conf to use + * @param executeAble executeAble to use + * @param tableNameString Table name to which tableNameString belongs. + * @param startRow start row + * @param endRow end row + * @param callTimeout timeout + */ + public ServerCallable(Configuration conf, OperationExecuteAble executeAble, String tableNameString, + byte[] startRow, byte[] endRow, int callTimeout) { + this.conf = conf; + this.obTableClient = null; + this.executeAble = executeAble; this.tableNameString = tableNameString; this.callTimeout = callTimeout; this.startRow = startRow; diff --git a/src/main/java/com/alipay/oceanbase/hbase/qualifiertype/OHTableMetaInitializer.java b/src/main/java/com/alipay/oceanbase/hbase/qualifiertype/OHTableMetaInitializer.java new file mode 100644 index 00000000..4c2febab --- /dev/null +++ b/src/main/java/com/alipay/oceanbase/hbase/qualifiertype/OHTableMetaInitializer.java @@ -0,0 +1,46 @@ +/*- + * #%L + * OceanBase Table Hbase Framework + * %% + * Copyright (C) 2016 - 2018 Ant Financial Services Group + * %% + * OBKV HBase Client Framework is licensed under Mulan PSL v2. + * You can use this software according to the terms and conditions of the Mulan PSL v2. + * You may obtain a copy of Mulan PSL v2 at: + * http://license.coscl.org.cn/MulanPSL2 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PSL v2 for more details. + * #L% + */ + +package com.alipay.oceanbase.hbase.qualifiertype; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** + * @author qiaoyunyao + * @version 1: OHTableMetaInitializer.java, v 0.1 2023年07月14日 16:59 qiaoyunyao Exp $ + */ +public class OHTableMetaInitializer { + + private static ConcurrentHashMap> tableFamilyTypeMap = new ConcurrentHashMap<>(); + + // 获取用户qualifier的类型信息,缓存到qualifierTypeMap中。新插入的k-v会覆盖原本的v + public static void setQualifierType(String tableName, String family, Map qualifierTypeMaps) { + String tableFamily = tableName + "|" + family; + tableFamilyTypeMap.put(tableFamily, qualifierTypeMaps); + + } + + public static ConcurrentHashMap> getTableFamilyTypeMap() { + return tableFamilyTypeMap; + } + + public static void clearQualifierMap(){ + tableFamilyTypeMap.clear(); + } + +} diff --git a/src/main/java/com/alipay/oceanbase/hbase/qualifiertype/ObQualifierType.java b/src/main/java/com/alipay/oceanbase/hbase/qualifiertype/ObQualifierType.java new file mode 100644 index 00000000..3b1b8cdc --- /dev/null +++ b/src/main/java/com/alipay/oceanbase/hbase/qualifiertype/ObQualifierType.java @@ -0,0 +1,46 @@ +/*- + * #%L + * OceanBase Table Hbase Framework + * %% + * Copyright (C) 2016 - 2018 Ant Financial Services Group + * %% + * OBKV HBase Client Framework is licensed under Mulan PSL v2. + * You can use this software according to the terms and conditions of the Mulan PSL v2. + * You may obtain a copy of Mulan PSL v2 at: + * http://license.coscl.org.cn/MulanPSL2 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PSL v2 for more details. + * #L% + */ + +package com.alipay.oceanbase.hbase.qualifiertype; + +/** + * @author qiaoyunyao + * @version 1: ObQualifierType.java, v 0.1 2023年07月12日 18:16 qiaoyunyao Exp $ + */ +public enum ObQualifierType { + INVALID_TYPE(0), Long(1), Int(2), Double(3); + + private final int code; + + ObQualifierType(int code) { + this.code = code; + } + + @Override + public String toString() { + switch (code) { + case 1: + return "Long"; + case 2: + return "Int"; + case 3: + return "Double"; + default: + return "INVALID_TYPE"; + } + } +} \ No newline at end of file diff --git a/src/main/java/com/alipay/oceanbase/hbase/util/ExecuteAbleManager.java b/src/main/java/com/alipay/oceanbase/hbase/util/ExecuteAbleManager.java new file mode 100644 index 00000000..20b6991f --- /dev/null +++ b/src/main/java/com/alipay/oceanbase/hbase/util/ExecuteAbleManager.java @@ -0,0 +1,356 @@ +/*- + * #%L + * OceanBase Table Hbase Framework + * %% + * Copyright (C) 2016 - 2018 Ant Financial Services Group + * %% + * OBKV HBase Client Framework is licensed under Mulan PSL v2. + * You can use this software according to the terms and conditions of the Mulan PSL v2. + * You may obtain a copy of Mulan PSL v2 at: + * http://license.coscl.org.cn/MulanPSL2 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PSL v2 for more details. + * #L% + */ + +package com.alipay.oceanbase.hbase.util; + +import com.alipay.oceanbase.rpc.ObTableClient; +import com.alipay.oceanbase.rpc.OperationExecuteAble; +import com.alipay.oceanbase.rpc.constant.Constants; +import com.alipay.oceanbase.rpc.property.Property; +import com.alipay.oceanbase.rpc.dds.DdsObTableClient; +import com.google.common.base.Objects; +import org.apache.hadoop.conf.Configuration; +import org.slf4j.Logger; + +import java.io.IOException; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.locks.ReentrantLock; + +import static com.alipay.oceanbase.hbase.constants.OHConstants.*; +import static com.alipay.oceanbase.hbase.util.Preconditions.checkArgument; +import static com.alipay.oceanbase.hbase.util.TableHBaseLoggerFactory.LCD; +import static org.apache.commons.lang.StringUtils.isNotBlank; + +/** + * @author zhiqi.zzq + * @since 2018/7/24 下午9:23 + */ +public class ExecuteAbleManager { + private static final Logger logger = TableHBaseLoggerFactory + .getLogger("ExecuteAbleManager"); + + public static final ConcurrentHashMap OB_TABLE_CLIENT_LOCK = new ConcurrentHashMap(); + public static final Map OB_TABLE_CLIENT_INSTANCE = new ConcurrentHashMap(); + + public static final ConcurrentHashMap DDS_OB_TABLE_CLIENT_LOCK = new ConcurrentHashMap(); + public static final Map DDS_OB_TABLE_CLIENT_INSTANCE = new ConcurrentHashMap(); + + /** + * Get or create the execution handler. + * + * Generally , we should not mix the table-specific paramURL mode with the appDataSource mode. + * If you do mix the methods you should be clearly aware of that you will get the sharding client when + * the specific table is not declared。 + * + * @param conf the config + */ + public static OperationExecuteAble getOrCreateExecuteAble(Configuration conf) + throws IllegalArgumentException, + IOException { + // 1. try to get ObTableClient according to the table-specific paramURL + String paramUrl = conf.get(HBASE_OCEANBASE_PARAM_URL); + String fullUserName = conf.get(HBASE_OCEANBASE_FULL_USER_NAME); + if (isNotBlank(paramUrl) || isNotBlank(fullUserName)) { + String password = conf.get(HBASE_OCEANBASE_PASSWORD, Constants.EMPTY_STRING); + return getOrCreateObTableClient(conf, paramUrl, fullUserName, password); + } + + // 2. otherwise, fall into the DdsObTableClient branch + return getOrCreateDdsObTableClient(conf); + } + + /** + * Get or create ObTableClient according to the obTableClientKey. + * + * @param obTableClientKey + * @return + * @throws IOException + */ + public static ObTableClient getOrCreateObTableClient(ObTableClientKey obTableClientKey) + throws IOException { + if (OB_TABLE_CLIENT_INSTANCE.get(obTableClientKey) == null) { + ReentrantLock tmp = new ReentrantLock(); + ReentrantLock lock = OB_TABLE_CLIENT_LOCK.putIfAbsent(obTableClientKey, tmp); + lock = lock == null ? tmp : lock; + lock.lock(); + try { + if (OB_TABLE_CLIENT_INSTANCE.get(obTableClientKey) == null) { + ObTableClient obTableClient = new ObTableClient(); + obTableClient.setParamURL(obTableClientKey.getParamUrl()); + obTableClient.setFullUserName(obTableClientKey.getFullUserName()); + obTableClient.setPassword(obTableClientKey.getPassword()); + obTableClient.setProperties(obTableClientKey.getProperties()); + obTableClient.setRunningMode(ObTableClient.RunningMode.HBASE); + obTableClient.init(); + OB_TABLE_CLIENT_INSTANCE.put(obTableClientKey, obTableClient); + if (logger.isInfoEnabled()) { + logger.info("create ObTableClient success with {}", obTableClientKey); + } + } + } catch (Exception e) { + logger.error(LCD.convert("01-00009"), obTableClientKey, e); + throw new IOException("create ObTableClient error with " + obTableClientKey, e); + } finally { + lock.unlock(); + } + } + return OB_TABLE_CLIENT_INSTANCE.get(obTableClientKey); + } + + /** + * Get or create ObTableClient according to paramUrl info. + * + * @param conf + * @param paramUrl + * @param fullUserName + * @param password + * @return + * @throws IOException + */ + public static ObTableClient getOrCreateObTableClient(Configuration conf, String paramUrl, + String fullUserName, String password) + throws IOException { + if (logger.isDebugEnabled()) { + logger.debug("try to getOrCreateObTableClient with paramUrl:" + paramUrl + + ", fullUserName:" + fullUserName); + } + checkArgument(isNotBlank(paramUrl), HBASE_OCEANBASE_PARAM_URL + " is blank"); + checkArgument(isNotBlank(fullUserName), HBASE_OCEANBASE_FULL_USER_NAME + " is blank"); + + ObTableClientKey obTableClientKey = new ObTableClientKey(); + obTableClientKey.setParamUrl(paramUrl); + obTableClientKey.setFullUserName(fullUserName); + obTableClientKey.setPassword(password); + for (Property property : Property.values()) { + String value = conf.get(property.getKey()); + if (value != null) { + obTableClientKey.getProperties().put(property.getKey(), value); + } + } + return getOrCreateObTableClient(obTableClientKey); + } + + /** + * ObTableClientKey consists of the triplet: paramUrl, fullUserName and password. + */ + public static class ObTableClientKey { + + private String paramUrl; + private String fullUserName; + private String password; + + private Properties properties = new Properties(); + + public String getParamUrl() { + return paramUrl; + } + + public void setParamUrl(String paramUrl) { + this.paramUrl = paramUrl; + } + + public String getFullUserName() { + return fullUserName; + } + + public void setFullUserName(String fullUserName) { + this.fullUserName = fullUserName; + } + + public String getPassword() { + return password; + } + + public void setPassword(String password) { + this.password = password; + } + + public Properties getProperties() { + return properties; + } + + public void setProperties(Properties properties) { + this.properties = properties; + } + + @Override + public boolean equals(Object o) { + if (this == o) + return true; + if (o == null || getClass() != o.getClass()) + return false; + ObTableClientKey that = (ObTableClientKey) o; + return Objects.equal(paramUrl, that.paramUrl) + && Objects.equal(fullUserName, that.fullUserName) + && Objects.equal(password, that.password); + } + + @Override + public int hashCode() { + return Objects.hashCode(paramUrl, fullUserName, password); + } + + @Override + public String toString() { + return "ObTableClientKey{" + "paramUrl='" + paramUrl + '\'' + ", fullUserName='" + + fullUserName + '\'' + ", properties=" + properties + '}'; + } + } + + /** + * DdsObTableClientKey consists of the appDataSource triplet: appName, appDsName and version. + */ + public static class DdsObTableClientKey { + + private String appName; + private String appDsName; + private String version; + + private Properties properties = new Properties(); + + public String getAppName() { + return appName; + } + + public void setAppName(String appName) { + this.appName = appName; + } + + public String getAppDsName() { + return appDsName; + } + + public void setAppDsName(String appDsName) { + this.appDsName = appDsName; + } + + public String getVersion() { + return version; + } + + public void setVersion(String version) { + this.version = version; + } + + public Properties getProperties() { + return properties; + } + + public void setProperties(Properties properties) { + this.properties = properties; + } + + @Override + public boolean equals(Object o) { + if (this == o) + return true; + if (!(o instanceof DdsObTableClientKey)) + return false; + DdsObTableClientKey that = (DdsObTableClientKey) o; + return Objects.equal(getAppName(), that.getAppName()) + && Objects.equal(getAppDsName(), that.getAppDsName()) + && Objects.equal(getVersion(), that.getVersion()); + } + + @Override + public int hashCode() { + return Objects.hashCode(getAppName(), getAppDsName(), getVersion()); + } + + @Override + public String toString() { + return "DdsObTableClientKey{" + "appName='" + appName + '\'' + ", appDsName='" + + appDsName + '\'' + ", version='" + version + '\'' + ", properties=" + + properties + '}'; + } + } + + /** + * Get or create DdsObTableClient according to ddsObTableClientKey. + * + * @param ddsObTableClientKey + * @return + * @throws IOException + */ + public static DdsObTableClient getOrCreateDdsObTableClient(DdsObTableClientKey ddsObTableClientKey) + throws IOException { + if (DDS_OB_TABLE_CLIENT_INSTANCE.get(ddsObTableClientKey) == null) { + ReentrantLock tmp = new ReentrantLock(); + ReentrantLock lock = DDS_OB_TABLE_CLIENT_LOCK.putIfAbsent(ddsObTableClientKey, tmp); + lock = lock == null ? tmp : lock; + lock.lock(); + try { + if (DDS_OB_TABLE_CLIENT_INSTANCE.get(ddsObTableClientKey) == null) { + DdsObTableClient ddsObTableClient = new DdsObTableClient(); + ddsObTableClient.setAppName(ddsObTableClientKey.getAppName()); + ddsObTableClient.setAppDsName(ddsObTableClientKey.getAppDsName()); + ddsObTableClient.setVersion(ddsObTableClientKey.getVersion()); + ddsObTableClient.setRunningMode(ObTableClient.RunningMode.HBASE); + ddsObTableClient.setTableClientProperty(ddsObTableClientKey.getProperties()); + ddsObTableClient.init(); + DDS_OB_TABLE_CLIENT_INSTANCE.put(ddsObTableClientKey, ddsObTableClient); + logger.info("create DdsObTableClient success with {}", ddsObTableClientKey); + } + } catch (Exception e) { + logger.error(LCD.convert("01-00010"), ddsObTableClientKey, e); + throw new IOException("create DdsObTableClient error with " + ddsObTableClientKey, + e); + } finally { + lock.unlock(); + } + } + return DDS_OB_TABLE_CLIENT_INSTANCE.get(ddsObTableClientKey); + } + + /** + * Get or create DdsObTableClient according to appDataSource. + * + * @param conf + * @return + * @throws IllegalArgumentException + * @throws IOException + */ + public static DdsObTableClient getOrCreateDdsObTableClient(Configuration conf) + throws IllegalArgumentException, + IOException { + String appName = conf.get(HBASE_OCEANBASE_DDS_APP_NAME); + String appDsName = conf.get(HBASE_OCEANBASE_DDS_APP_DS_NAME); + String version = conf.get(HBASE_OCEANBASE_DDS_VERSION); + if (logger.isDebugEnabled()) { + logger.debug("try to getOrCreateDdsObTableClient with appName:" + appName + + ", appDsName:" + appDsName + ", version:" + version); + } + checkArgument(isNotBlank(appName), HBASE_OCEANBASE_DDS_APP_NAME + " is blank"); + checkArgument(isNotBlank(appDsName), HBASE_OCEANBASE_DDS_APP_DS_NAME + " is blank"); + checkArgument(isNotBlank(version), HBASE_OCEANBASE_DDS_VERSION + " is blank"); + + DdsObTableClientKey ddsObTableClientKey = new DdsObTableClientKey(); + ddsObTableClientKey.setAppName(appName); + ddsObTableClientKey.setAppDsName(appDsName); + ddsObTableClientKey.setVersion(version); + + for (Property property : Property.values()) { + String value = conf.get(property.getKey()); + if (value != null) { + ddsObTableClientKey.getProperties().put(property.getKey(), value); + } + } + return getOrCreateDdsObTableClient(ddsObTableClientKey); + } +} \ No newline at end of file diff --git a/src/main/java/com/alipay/oceanbase/hbase/util/KeyDefiner.java b/src/main/java/com/alipay/oceanbase/hbase/util/KeyDefiner.java index 058d05e7..8e90ecff 100644 --- a/src/main/java/com/alipay/oceanbase/hbase/util/KeyDefiner.java +++ b/src/main/java/com/alipay/oceanbase/hbase/util/KeyDefiner.java @@ -25,4 +25,14 @@ public class KeyDefiner { public static String genPooledOHTableAttributeName(String tableName, String key) { return tableName + OHConstants.HBASE_HTABLE_POOL_SEPERATOR + key; } + public static String[] parsePoolOHTableAttributeName(String key) { + if (key == null) { + return null; + } + String[] parsedKey = key.split("\\" + OHConstants.HBASE_HTABLE_POOL_SEPERATOR); + if (parsedKey.length != 2) { + return null; + } + return parsedKey; + } } diff --git a/src/main/java/com/alipay/oceanbase/hbase/util/OHTableFactory.java b/src/main/java/com/alipay/oceanbase/hbase/util/OHTableFactory.java index 8d42daa4..7ed4c3ca 100644 --- a/src/main/java/com/alipay/oceanbase/hbase/util/OHTableFactory.java +++ b/src/main/java/com/alipay/oceanbase/hbase/util/OHTableFactory.java @@ -92,6 +92,17 @@ public HTableInterface createHTableInterface(Configuration config, byte[] tableN } private Configuration adjustConfiguration(Configuration configuration, String tableName) { + // Check if DDS mode is configured + String ddsAppName = configuration.get(HBASE_OCEANBASE_DDS_APP_NAME); + String ddsAppDsName = configuration.get(HBASE_OCEANBASE_DDS_APP_DS_NAME); + String ddsVersion = configuration.get(HBASE_OCEANBASE_DDS_VERSION); + + if (isNotBlank(ddsAppName) && isNotBlank(ddsAppDsName) && isNotBlank(ddsVersion)) { + // DDS mode - no additional parameter validation needed + // The DDS parameters are already set in configuration + return configuration; + } + byte[] isOdpModeAttr = tablePool.getTableAttribute(tableName, HBASE_OCEANBASE_ODP_MODE); if ((isOdpModeAttr != null && Bytes.toBoolean(isOdpModeAttr)) || (isOdpModeAttr == null && configuration.getBoolean(HBASE_OCEANBASE_ODP_MODE, false))) { diff --git a/src/main/java/com/alipay/oceanbase/hbase/util/TableHBaseLoggerFactory.java b/src/main/java/com/alipay/oceanbase/hbase/util/TableHBaseLoggerFactory.java index f90b5d0a..a66da1f6 100644 --- a/src/main/java/com/alipay/oceanbase/hbase/util/TableHBaseLoggerFactory.java +++ b/src/main/java/com/alipay/oceanbase/hbase/util/TableHBaseLoggerFactory.java @@ -21,6 +21,7 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.slf4j.helpers.NOPLogger; @InterfaceAudience.Private public class TableHBaseLoggerFactory { @@ -28,6 +29,8 @@ public class TableHBaseLoggerFactory { public static final String TABLE_HBASE_LOGGER_SPACE = "oceanbase-table-hbase"; public static LogCode2Description LCD = LogCode2Description .create(TABLE_HBASE_LOGGER_SPACE); + public static Logger MONITOR = NOPLogger.NOP_LOGGER; + public static Logger logger = NOPLogger.NOP_LOGGER; public static Logger getLogger(String name) { if (name == null || name.isEmpty()) { diff --git a/src/test/java/com/alipay/oceanbase/hbase/OHTableDDSTest.java b/src/test/java/com/alipay/oceanbase/hbase/OHTableDDSTest.java new file mode 100644 index 00000000..56365368 --- /dev/null +++ b/src/test/java/com/alipay/oceanbase/hbase/OHTableDDSTest.java @@ -0,0 +1,832 @@ +/*- + * #%L + * OceanBase Table Hbase Framework + * %% + * Copyright (C) 2016 - 2021 Ant Financial Services Group + * %% + * OBKV HBase Client Framework is licensed under Mulan PSL v2. + * You can use this software according to the terms and conditions of the Mulan PSL v2. + * You may obtain a copy of Mulan PSL v2 at: + * http://license.coscl.org.cn/MulanPSL2 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PSL v2 for more details. + * #L% + */ + +package com.alipay.oceanbase.hbase; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.client.*; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.hbase.util.PoolMap; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Ignore; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.Executors; + +import static com.alipay.oceanbase.hbase.constants.OHConstants.*; +import static org.apache.hadoop.hbase.util.Bytes.toBytes; + +public class OHTableDDSTest { + private String APP_NAME = "obkv"; + private String APP_DS_NAME_4x = "obkv4_adapt_dds_client_test"; + private String APP_DS_NAME_2x = "obkv4_adapt_dds_client_test_2x"; + private String VERSION = "v1.0"; + private String APP_DS_NAME = APP_DS_NAME_2x; + private String hTableName = APP_DS_NAME.equals(APP_DS_NAME_4x) ? "test" : "testt"; + + protected OHTablePool ohTablePool; + + @Before + public void setup() throws Exception { + Configuration conf = new Configuration(); + conf.set(HBASE_OCEANBASE_DDS_APP_NAME, APP_NAME); + conf.set(HBASE_OCEANBASE_DDS_APP_DS_NAME, APP_DS_NAME); + conf.set(HBASE_OCEANBASE_DDS_VERSION, VERSION); + ohTablePool = new OHTablePool(conf, 10, PoolMap.PoolType.Reusable); + ohTablePool.setRuntimeBatchExecutor("testt", Executors.newFixedThreadPool(33)); + ohTablePool.init(); + } + + /* + * + create table if not exists test$family_01( + K varbinary(1024) NOT NULL, + Q varbinary(256) NOT NULL, + T bigint(20) NOT NULL, + V varbinary(1024) NOT NULL, + PRIMARY KEY (K, Q, T) + ) TABLEGROUP = 'test'; + + create table if not exists test$family_02( + K varbinary(1024) NOT NULL, + Q varbinary(256) NOT NULL, + T bigint(20) NOT NULL, + V varbinary(1024) NOT NULL, + PRIMARY KEY (K, Q, T) + ) TABLEGROUP = 'test'; + + create table if not exists test$family_03( + K varbinary(1024) NOT NULL, + Q varbinary(256) NOT NULL, + T bigint(20) NOT NULL, + V varbinary(1024) NOT NULL, + PRIMARY KEY (K, Q, T) + ) TABLEGROUP = 'test'; + + */ + + /** + * TestCase: 分表路由到单个分区,表为 test$family_00. + * 扩展测试:覆盖00-09所有分区的单分区测试 + */ + @Test + public void testSharding00() throws Exception { + String family = "family"; + String column1 = "column1"; + String value = "value"; + long timestamp = System.currentTimeMillis(); + + // 测试所有分区 00-09 + for (int partition = 0; partition <= 9; partition++) { + String partitionPrefix = String.format("%02d", partition); + String key1 = partitionPrefix + "_TESTKEY0"; + String key2 = partitionPrefix + "_TESTKEY1"; + String key3 = partitionPrefix + "_TESTKEY2"; + + HTableInterface hTable = ohTablePool.getTable(hTableName); + + try { + // 测试单个分区内的操作 + Put put = new Put(toBytes(key1)); + put.add(family.getBytes(), column1.getBytes(), timestamp, + toBytes(value + "_" + partitionPrefix)); + hTable.put(put); + Get get = new Get(toBytes(key1)); + get.addColumn(family.getBytes(), toBytes(column1)); + Result r = hTable.get(get); + Assert.assertEquals(1, r.raw().length); + + put = new Put(toBytes(key2)); + put.add(family.getBytes(), column1.getBytes(), timestamp, + toBytes(value + "_" + partitionPrefix)); + hTable.put(put); + + put = new Put(toBytes(key3)); + put.add(family.getBytes(), column1.getBytes(), timestamp, + toBytes(value + "_" + partitionPrefix)); + hTable.put(put); + + // 同分区内的scan应该成功 + Scan scan = new Scan(); + scan.addColumn(family.getBytes(), column1.getBytes()); + scan.setStartRow(toBytes(key1)); + scan.setStopRow(toBytes(key3)); + scan.setMaxVersions(9); + ResultScanner scanner = hTable.getScanner(scan); + + int count = 0; + for (Result result : scanner) { + for (KeyValue keyValue : result.raw()) { + System.out.println("Partition " + partitionPrefix + " - rowKey: " + + new String(keyValue.getRow()) + " columnQualifier:" + + new String(keyValue.getQualifier()) + " timestamp:" + + keyValue.getTimestamp() + " value:" + + new String(keyValue.getValue())); + count++; + } + } + scanner.close(); + Assert.assertEquals(2, count); + + } finally { + // 清理数据 + Delete delete = new Delete(toBytes(key1)); + delete.deleteFamily(family.getBytes()); + hTable.delete(delete); + delete = new Delete(toBytes(key2)); + delete.deleteFamily(family.getBytes()); + hTable.delete(delete); + delete = new Delete(toBytes(key3)); + delete.deleteFamily(family.getBytes()); + hTable.delete(delete); + } + } + } + + /** + * TestCase: 分表路由到多个分区,分表为 test$family_{00-09}. + * 扩展测试:覆盖所有00-09分区的跨分区测试 + */ + @Test + public void testSharding01() throws Exception { + String family = "family"; + String column1 = "column1"; + // 扩展到覆盖所有分区 00-09 + String key[] = new String[10]; + for (int i = 0; i < 10; i++) { + key[i] = String.format("%02dTEST_KEY%d", i, i); + } + String value = "value"; + long timestamp = System.currentTimeMillis(); + + Configuration conf = new Configuration(); + conf.set(HBASE_OCEANBASE_DDS_APP_NAME, APP_NAME); + conf.set(HBASE_OCEANBASE_DDS_APP_DS_NAME, APP_DS_NAME); + conf.set(HBASE_OCEANBASE_DDS_VERSION, VERSION); + HTableInterface hTable = ohTablePool.getTable(hTableName); + + try { + // 测试所有分区的写入和读取 + for (int i = 0; i < key.length; i++) { + Put put = new Put(toBytes(key[i])); + put.add(family.getBytes(), column1.getBytes(), timestamp, + toBytes(value + "_partition_" + String.format("%02d", i))); + hTable.put(put); + Get get = new Get(toBytes(key[i])); + get.addColumn(family.getBytes(), toBytes(column1)); + Result r = hTable.get(get); + Assert.assertEquals(1, r.raw().length); + System.out.println("Successfully wrote and read from partition " + + String.format("%02d", i)); + } + + // 测试单个分区内的scan(应该成功) + for (int partition = 0; partition < 10; partition++) { + String partitionPrefix = String.format("%02d", partition); + Scan scan = new Scan(); + scan.addColumn(family.getBytes(), column1.getBytes()); + scan.setStartRow(toBytes(partitionPrefix + "TEST_KEY" + partition)); + scan.setStopRow(toBytes(partitionPrefix + "TEST_KEY" + partition + "Z")); + scan.setMaxVersions(9); + ResultScanner scanner = hTable.getScanner(scan); + + int count = 0; + for (Result result : scanner) { + for (KeyValue keyValue : result.raw()) { + System.out.println("Partition " + partitionPrefix + " scan - rowKey: " + + new String(keyValue.getRow()) + " value: " + + new String(keyValue.getValue())); + count++; + } + } + scanner.close(); + Assert.assertEquals(1, count); + } + + // 跨 partition 的 Scan 是不支持的 - 测试多个跨分区场景 + int[][] crossPartitionTests = { { 0, 2 }, { 1, 4 }, { 5, 8 }, { 3, 9 } }; + for (int[] testCase : crossPartitionTests) { + try { + Scan scan = new Scan(); + scan.addColumn(family.getBytes(), column1.getBytes()); + scan.setStartRow(toBytes(key[testCase[0]])); + scan.setStopRow(toBytes(key[testCase[1]])); + scan.setMaxVersions(9); + hTable.getScanner(scan); + Assert.fail("Expected exception for cross-partition scan from " + testCase[0] + + " to " + testCase[1]); + } catch (Exception e) { + System.out.println("Cross-partition scan correctly failed: " + testCase[0] + + " to " + testCase[1]); + Assert.assertTrue("Cross-partition scan should fail", true); + } + } + } finally { + for (int i = 0; i < key.length; i++) { + Delete delete = new Delete(toBytes(key[i])); + delete.deleteFamily(family.getBytes()); + hTable.delete(delete); + } + hTable.close(); + } + } + + @Test + public void testAppendIncrement() throws Exception { + String family = "family"; + String column1 = "column1"; + String key1 = "00_TESTKEY0"; + String value = "value"; + String appendValue = "appendValue"; + String key2 = "00_TESTKEY1"; + long incrValue = 100L; + + Configuration conf = new Configuration(); + conf.set(HBASE_OCEANBASE_DDS_APP_NAME, APP_NAME); + conf.set(HBASE_OCEANBASE_DDS_APP_DS_NAME, APP_DS_NAME); + conf.set(HBASE_OCEANBASE_DDS_VERSION, VERSION); + HTableInterface hTable = ohTablePool.getTable(hTableName); + + try { + long timestamp = System.currentTimeMillis(); + Put put = new Put(toBytes(key1)); + put.add(family.getBytes(), column1.getBytes(), timestamp, toBytes(value)); + hTable.put(put); + + Get get = new Get(toBytes(key1)); + get.addColumn(family.getBytes(), toBytes(column1)); + Result r = hTable.get(get); + Assert.assertEquals(1, r.raw().length); + + Assert.assertEquals(value, + new String(r.getValue(family.getBytes(), column1.getBytes()))); + + Append append = new Append(toBytes(key1)); + append.add(family.getBytes(), column1.getBytes(), toBytes(appendValue)); + hTable.append(append); + r = hTable.get(get); + Assert.assertEquals(1, r.raw().length); + Assert.assertEquals(value + appendValue, + new String(r.getValue(family.getBytes(), column1.getBytes()))); + + Increment increment = new Increment(toBytes(key2)); + increment.addColumn(family.getBytes(), column1.getBytes(), incrValue); + hTable.increment(increment); + get = new Get(toBytes(key2)); + get.addColumn(family.getBytes(), toBytes(column1)); + r = hTable.get(get); + Assert.assertEquals(1, r.raw().length); + System.out.println(Arrays.toString(r.getValue(family.getBytes(), column1.getBytes()))); + long actualValue = Bytes.toLong(r.getValue(family.getBytes(), column1.getBytes())); + Assert.assertEquals(incrValue, actualValue); + } finally { + Delete delete = new Delete(toBytes(key1)); + delete.deleteFamily(family.getBytes()); + hTable.delete(delete); + delete = new Delete(toBytes(key2)); + delete.deleteFamily(family.getBytes()); + hTable.delete(delete); + hTable.close(); + } + } + + @Test + public void testCheckAndPut() throws Exception { + String family = "family"; + String column1 = "column1"; + String key1 = "00_TESTKEY0"; + String value = "value"; + String newValue = "newValue"; + long timestamp = System.currentTimeMillis(); + Configuration conf = new Configuration(); + conf.set(HBASE_OCEANBASE_DDS_APP_NAME, APP_NAME); + conf.set(HBASE_OCEANBASE_DDS_APP_DS_NAME, APP_DS_NAME); + conf.set(HBASE_OCEANBASE_DDS_VERSION, VERSION); + HTableInterface hTable = ohTablePool.getTable(hTableName); + try { + Put put = new Put(key1.getBytes()); + put.add(family.getBytes(), column1.getBytes(), value.getBytes()); + hTable.put(put); + Get get = new Get(key1.getBytes()); + get.addColumn(family.getBytes(), column1.getBytes()); + Result r = hTable.get(get); + Assert.assertEquals(1, r.raw().length); + put = new Put(key1.getBytes()); + put.add(family.getBytes(), column1.getBytes(), newValue.getBytes()); + boolean ret = hTable.checkAndPut(key1.getBytes(), family.getBytes(), + column1.getBytes(), value.getBytes(), put); + Assert.assertTrue(ret); + r = hTable.get(get); + Assert.assertEquals(1, r.raw().length); + Assert.assertEquals(newValue, + new String(r.getValue(family.getBytes(), column1.getBytes()))); + } finally { + Delete delete = new Delete(key1.getBytes()); + delete.deleteFamily(family.getBytes()); + hTable.delete(delete); + hTable.close(); + } + } + + /** + * TestCase: 跨数据库测试 - 测试不同数据库 group_00-group_09 的表操作 + * 每个数据库包含 test$family_00-test$family_09 表,覆盖所有分区 + */ + @Test + public void testCrossDatabase() throws Exception { + String family = "family"; + String column1 = "column1"; + String value = "cross_db_value"; + + Configuration conf = new Configuration(); + conf.set(HBASE_OCEANBASE_DDS_APP_NAME, APP_NAME); + conf.set(HBASE_OCEANBASE_DDS_APP_DS_NAME, APP_DS_NAME); + conf.set(HBASE_OCEANBASE_DDS_VERSION, VERSION); + HTableInterface hTable = ohTablePool.getTable(hTableName); + + List testKeys = new ArrayList<>(); + try { + // 测试跨数据库和跨表操作:使用key前缀路由到不同的数据库group_XX和表test$family_XX + for (int dbIndex = 0; dbIndex <= 9; dbIndex++) { + for (int tableIndex = 0; tableIndex <= 9; tableIndex++) { + // 使用特定格式的key确保路由到正确的数据库和表 + // 格式:dbIndex_tableIndex_... 来路由到 group_dbIndex 数据库的 test$family_tableIndex 表 + String key = String.format("%02d_%02d_CROSS_DB_TABLE_KEY", dbIndex, tableIndex); + testKeys.add(key); + + // 写入数据 + Put put = new Put(toBytes(key)); + put.add(family.getBytes(), column1.getBytes(), + toBytes(value + "_db" + dbIndex + "_table" + tableIndex)); + hTable.put(put); + + // 验证写入 + Get get = new Get(toBytes(key)); + get.addColumn(family.getBytes(), toBytes(column1)); + Result r = hTable.get(get); + Assert.assertEquals(1, r.raw().length); + + String expectedValue = value + "_db" + dbIndex + "_table" + tableIndex; + String actualValue = new String(r.getValue(family.getBytes(), column1.getBytes())); + Assert.assertEquals(expectedValue, actualValue); + + System.out.println("Cross-DB-Table test: Successfully wrote/read group_" + + String.format("%02d", dbIndex) + ".test$family_" + String.format("%02d", tableIndex) + + " with key: " + key); + } + } + + // 测试同一分区内的批量操作(避免跨分区批量操作) + // 只在一个特定的数据库表组合中进行批量操作 + int testDbIndex = 0; + int testTableIndex = 0; + List sameBatchPuts = new ArrayList<>(); + for (int i = 0; i < 5; i++) { // 在同一个分区中创建5条记录 + String batchKey = String.format("%02d_%02d_SAME_BATCH_KEY_%d", testDbIndex, testTableIndex, i); + testKeys.add(batchKey); + + Put put = new Put(toBytes(batchKey)); + put.add(family.getBytes(), column1.getBytes(), + toBytes("same_batch_value_" + i + "_db" + testDbIndex + "_table" + testTableIndex)); + sameBatchPuts.add(put); + } + + // 执行同分区批量写入 + hTable.put(sameBatchPuts); + System.out.println("Same-partition batch: Completed batch put of " + sameBatchPuts.size() + + " records in group_" + String.format("%02d", testDbIndex) + ".test$family_" + String.format("%02d", testTableIndex)); + + // 验证批量写入的数据 + for (int i = 0; i < 5; i++) { + String batchKey = String.format("%02d_%02d_SAME_BATCH_KEY_%d", testDbIndex, testTableIndex, i); + Get get = new Get(toBytes(batchKey)); + get.addColumn(family.getBytes(), toBytes(column1)); + Result r = hTable.get(get); + Assert.assertEquals(1, r.raw().length); + + String expectedValue = "same_batch_value_" + i + "_db" + testDbIndex + "_table" + testTableIndex; + String actualValue = new String(r.getValue(family.getBytes(), column1.getBytes())); + Assert.assertEquals(expectedValue, actualValue); + } + System.out.println("Same-partition batch verification completed successfully"); + + } finally { + // 清理数据 + for (String key : testKeys) { + Delete delete = new Delete(toBytes(key)); + delete.deleteFamily(family.getBytes()); + hTable.delete(delete); + } + hTable.close(); + } + } + + /** + * TestCase: 跨表测试 - 测试同一数据库内不同表的操作 + * 在每个数据库 group_XX 中测试 test$family_00 到 test$family_09 的表操作 + */ + @Test + public void testCrossTable() throws Exception { + String family = "family"; + String column1 = "column1"; + String value = "cross_table_value"; + + Configuration conf = new Configuration(); + conf.set(HBASE_OCEANBASE_DDS_APP_NAME, APP_NAME); + conf.set(HBASE_OCEANBASE_DDS_APP_DS_NAME, APP_DS_NAME); + conf.set(HBASE_OCEANBASE_DDS_VERSION, VERSION); + HTableInterface hTable = ohTablePool.getTable(hTableName); + + List testKeys = new ArrayList<>(); + + try { + // 测试跨表操作:通过不同的key前缀路由到不同的表 test$family_XX + // 这里只测试同一个数据库内的跨表操作 + for (int tableIndex = 0; tableIndex <= 9; tableIndex++) { + for (int recordIndex = 0; recordIndex < 3; recordIndex++) { // 每个表3条记录 + // 使用特定格式确保路由到正确的表 + // 格式:00_tableIndex_... 来路由到 group_00 数据库的 test$family_tableIndex 表 + String key = String.format("00_%02d_TABLE_KEY_%d", tableIndex, recordIndex); + testKeys.add(key); + + // 写入数据 + Put put = new Put(toBytes(key)); + put.add(family.getBytes(), column1.getBytes(), + toBytes(value + "_table" + tableIndex + "_record" + recordIndex)); + hTable.put(put); + + // 验证写入 + Get get = new Get(toBytes(key)); + get.addColumn(family.getBytes(), toBytes(column1)); + Result r = hTable.get(get); + Assert.assertEquals(1, r.raw().length); + + String expectedValue = value + "_table" + tableIndex + "_record" + recordIndex; + String actualValue = new String(r.getValue(family.getBytes(), column1.getBytes())); + Assert.assertEquals(expectedValue, actualValue); + + System.out.println("Cross-table test: Successfully wrote/read group_00.test$family_" + + String.format("%02d", tableIndex) + " record " + recordIndex + " with key: " + key); + } + } + + // 测试跨表的条件操作 + for (int tableIndex = 0; tableIndex < 5; tableIndex++) { // 测试前5个表 + String checkKey = String.format("00_%02d_CHECK_PUT", tableIndex); + testKeys.add(checkKey); + + Put initialPut = new Put(toBytes(checkKey)); + initialPut.add(family.getBytes(), column1.getBytes(), toBytes("initial")); + hTable.put(initialPut); + + Put updatePut = new Put(toBytes(checkKey)); + updatePut.add(family.getBytes(), column1.getBytes(), toBytes("updated")); + boolean success = hTable.checkAndPut(toBytes(checkKey), family.getBytes(), + column1.getBytes(), toBytes("initial"), updatePut); + Assert.assertTrue("CheckAndPut should succeed for table " + tableIndex, success); + + Get get = new Get(toBytes(checkKey)); + get.addColumn(family.getBytes(), toBytes(column1)); + Result r = hTable.get(get); + Assert.assertEquals("updated", + new String(r.getValue(family.getBytes(), column1.getBytes()))); + + System.out.println("Cross-table CheckAndPut: Successfully updated table " + tableIndex); + } + + } finally { + for (String key : testKeys) { + try { + Delete delete = new Delete(toBytes(key)); + delete.deleteFamily(family.getBytes()); + hTable.delete(delete); + } catch (Exception e) { + } + } + hTable.close(); + } + } + + /** + * TestCase: 批量操作跨分区测试 + * 测试批量写入和读取跨多个分区的数据 + */ + @Test + public void testBatchOperationsAcrossPartitions() throws Exception { + String family = "family"; + String column1 = "column1"; + String value = "batch_value"; + long timestamp = System.currentTimeMillis(); + + Configuration conf = new Configuration(); + conf.set(HBASE_OCEANBASE_DDS_APP_NAME, APP_NAME); + conf.set(HBASE_OCEANBASE_DDS_APP_DS_NAME, APP_DS_NAME); + conf.set(HBASE_OCEANBASE_DDS_VERSION, VERSION); + HTableInterface hTable = ohTablePool.getTable(hTableName); + + List testKeys = new ArrayList<>(); + + try { + int totalSuccessCount = 0; + for (int partition = 0; partition <= 9; partition++) { + List partitionPuts = new ArrayList<>(); + List partitionGets = new ArrayList<>(); + List partitionKeys = new ArrayList<>(); + + for (int i = 0; i < 3; i++) { // 每个分区3条数据 + String key = String.format("%02d_BATCH_KEY_%d_%d", partition, partition, i); + partitionKeys.add(key); + testKeys.add(key); + + Put put = new Put(toBytes(key)); + put.add(family.getBytes(), column1.getBytes(), + toBytes(value + "_" + partition + "_" + i)); + partitionPuts.add(put); + + Get get = new Get(toBytes(key)); + get.addColumn(family.getBytes(), toBytes(column1)); + partitionGets.add(get); + } + + hTable.put(partitionPuts); + System.out.println("Batch put completed for partition " + String.format("%02d", partition) + + ": " + partitionPuts.size() + " records"); + + Result[] results = hTable.get(partitionGets); + Assert.assertEquals(partitionGets.size(), results.length); + + int partitionSuccessCount = 0; + for (int i = 0; i < results.length; i++) { + Result result = results[i]; + if (result != null && result.raw().length > 0) { + String expectedValue = value + "_" + partition + "_" + i; + String actualValue = new String(result.getValue(family.getBytes(), column1.getBytes())); + Assert.assertEquals(expectedValue, actualValue); + partitionSuccessCount++; + } + } + Assert.assertEquals("All batch reads should succeed for partition " + partition, + partitionKeys.size(), partitionSuccessCount); + totalSuccessCount += partitionSuccessCount; + + System.out.println("Batch get completed for partition " + String.format("%02d", partition) + + ": " + partitionSuccessCount + " records verified"); + } + + System.out.println("Total batch operations completed: " + totalSuccessCount + " records across all partitions"); + + } finally { + for (int partition = 0; partition <= 9; partition++) { + List partitionDeletes = new ArrayList<>(); + String partitionPrefix = String.format("%02d_", partition); + + for (String key : testKeys) { + if (key.startsWith(partitionPrefix)) { + Delete delete = new Delete(toBytes(key)); + delete.deleteFamily(family.getBytes()); + partitionDeletes.add(delete); + } + } + + if (!partitionDeletes.isEmpty()) { + hTable.delete(partitionDeletes); + System.out.println("Cleaned up " + partitionDeletes.size() + " records from partition " + + String.format("%02d", partition)); + } + } + hTable.close(); + } + } + + @Test + public void testCheckAndDelete() throws Exception { + String family = "family"; + String column1 = "column1"; + String key1 = "00_TESTDELETEKEY"; + String key2 = "00_TESTDELETEKEY2"; + String value = "value"; + String newValue = "newValue"; + long timestamp = System.currentTimeMillis(); + Configuration conf = new Configuration(); + conf.set(HBASE_OCEANBASE_DDS_APP_NAME, APP_NAME); + conf.set(HBASE_OCEANBASE_DDS_APP_DS_NAME, APP_DS_NAME); + conf.set(HBASE_OCEANBASE_DDS_VERSION, VERSION); + HTableInterface hTable = ohTablePool.getTable(hTableName); + + try { + Put put = new Put(key1.getBytes()); + put.add(family.getBytes(), column1.getBytes(), value.getBytes()); + hTable.put(put); + Get get = new Get(key1.getBytes()); + get.addColumn(family.getBytes(), column1.getBytes()); + Result r = hTable.get(get); + Assert.assertEquals(1, r.raw().length); + Delete delete = new Delete(key1.getBytes()); + delete.deleteColumn(family.getBytes(), column1.getBytes()); + boolean ret = hTable.checkAndDelete(key1.getBytes(), family.getBytes(), + column1.getBytes(), value.getBytes(), delete); + Assert.assertTrue(ret); + r = hTable.get(get); + Assert.assertEquals(0, r.raw().length); + } finally { + Delete delete = new Delete(key1.getBytes()); + delete.deleteFamily(family.getBytes()); + hTable.delete(delete); + hTable.close(); + } + } + + /* + create tablegroup if not exists test_multicf SHARDING = 'ADAPTIVE'; + create table if not exists test_multicf$family1_00( + K varbinary(1024) NOT NULL, + Q varbinary(256) NOT NULL, + T bigint(20) NOT NULL, + V varbinary(1024) NOT NULL, + PRIMARY KEY (K, Q, T) + ) TABLEGROUP = 'test_multicf'; + + create table if not exists test_multicf$family1_01( + K varbinary(1024) NOT NULL, + Q varbinary(256) NOT NULL, + T bigint(20) NOT NULL, + V varbinary(1024) NOT NULL, + PRIMARY KEY (K, Q, T) + ) TABLEGROUP = 'test_multicf'; + + create table if not exists test_multicf$family1_02( + K varbinary(1024) NOT NULL, + Q varbinary(256) NOT NULL, + T bigint(20) NOT NULL, + V varbinary(1024) NOT NULL, + PRIMARY KEY (K, Q, T) + ) TABLEGROUP = 'test_multicf'; + + create table if not exists test_multicf$family1_03( + K varbinary(1024) NOT NULL, + Q varbinary(256) NOT NULL, + T bigint(20) NOT NULL, + V varbinary(1024) NOT NULL, + PRIMARY KEY (K, Q, T) + ) TABLEGROUP = 'test_multicf'; + + create table if not exists test_multicf$family2_00( + K varbinary(1024) NOT NULL, + Q varbinary(256) NOT NULL, + T bigint(20) NOT NULL, + V varbinary(1024) NOT NULL, + PRIMARY KEY (K, Q, T) + ) TABLEGROUP = 'test_multicf'; + + create table if not exists test_multicf$family2_01( + K varbinary(1024) NOT NULL, + Q varbinary(256) NOT NULL, + T bigint(20) NOT NULL, + V varbinary(1024) NOT NULL, + PRIMARY KEY (K, Q, T) + ) TABLEGROUP = 'test_multicf'; + + create table if not exists test_multicf$family2_02( + K varbinary(1024) NOT NULL, + Q varbinary(256) NOT NULL, + T bigint(20) NOT NULL, + V varbinary(1024) NOT NULL, + PRIMARY KEY (K, Q, T) + ) TABLEGROUP = 'test_multicf'; + + create table if not exists test_multicf$family2_03( + K varbinary(1024) NOT NULL, + Q varbinary(256) NOT NULL, + T bigint(20) NOT NULL, + V varbinary(1024) NOT NULL, + PRIMARY KEY (K, Q, T) + ) TABLEGROUP = 'test_multicf'; + */ + + /** + * TestCase: 多CF功能测试, 每个CF进行分表 + */ + @Ignore + public void testSharding02() throws Exception { + String tableName = "test_multicf"; + String family1 = "family1"; + String family2 = "family2"; + String column1 = "column1"; + List keyList = new ArrayList<>(); + for (int partition = 0; partition <= 9; partition++) { + for (int i = 0; i < 2; i++) { // 每个分区2个key + keyList.add(String.format("%02dTEST_KEY%d_%d", partition, partition, i)); + } + } + String[] key = keyList.toArray(new String[0]); + String value = "value"; + + Configuration conf = new Configuration(); + conf.set(HBASE_OCEANBASE_DDS_APP_NAME, APP_NAME); + conf.set(HBASE_OCEANBASE_DDS_APP_DS_NAME, APP_DS_NAME); + conf.set(HBASE_OCEANBASE_DDS_VERSION, VERSION); + HTableInterface hTable = new OHTable(conf, tableName); + + try { + for (int i = 0; i < key.length; i++) { + Put put = new Put(toBytes(key[i])); + put.addColumn(family1.getBytes(), (column1 + family1).getBytes(), toBytes(value + key[i] + + family1)); + put.addColumn(family2.getBytes(), (column1 + family2).getBytes(), toBytes(value + key[i] + + family2)); + hTable.put(put); + System.out.println("Multi-CF: Successfully wrote key " + key[i] + " to both families"); + } + + for (int i = 0; i < key.length; i++) { + Get get = new Get(toBytes(key[i])); + get.addFamily(family1.getBytes()); + get.addFamily(family2.getBytes()); + Result result = hTable.get(get); + Assert.assertEquals(2, result.raw().length); + System.out.println("Multi-CF: Successfully read key " + key[i] + " from both families"); + } + + List> scans = new ArrayList<>(); + for (int partition = 0; partition <= 9; partition++) { + String startKey = String.format("%02dA", partition); + String endKey = String.format("%02dZ", partition); + scans.add(new Pair<>(toBytes(startKey), toBytes(endKey))); + } + for (int partition = 0; partition < scans.size(); partition++) { + Pair scan = scans.get(partition); + + Scan s = new Scan(scan.getFirst(), scan.getSecond()); + s.addFamily(family1.getBytes()); + ResultScanner scanner = hTable.getScanner(s); + int count1 = 0; + for (Result result : scanner) { + System.out.println("Partition " + String.format("%02d", partition) + + " Family1 Forward Scan result: " + Arrays.toString(result.raw())); + count1++; + } + scanner.close(); + + s = new Scan(scan.getFirst(), scan.getSecond()); + s.addFamily(family2.getBytes()); + scanner = hTable.getScanner(s); + int count2 = 0; + for (Result result : scanner) { + System.out.println("Partition " + String.format("%02d", partition) + + " Family2 Forward Scan result: " + Arrays.toString(result.raw())); + count2++; + } + scanner.close(); + + Assert.assertEquals("Partition " + partition + " should have data in family1", count1, count2); + } + + for (int partition = 0; partition < Math.min(5, scans.size()); partition++) { + Pair scan = scans.get(partition); + + Scan s = new Scan(scan.getSecond(), scan.getFirst()); + s.addFamily(family1.getBytes()); + s.setReversed(true); + ResultScanner scanner = hTable.getScanner(s); + for (Result result : scanner) { + System.out.println("Partition " + String.format("%02d", partition) + + " Family1 Reverse Scan result: " + Arrays.toString(result.raw())); + } + scanner.close(); + } + + } catch (Exception e) { + e.printStackTrace(); + Assert.fail("testSharding02 failed: " + e.getMessage()); + } finally { + for (int i = 0; i < key.length; i++) { + Delete delete = new Delete(toBytes(key[i])); + delete.addFamily(family1.getBytes()); + delete.addFamily(family2.getBytes()); + hTable.delete(delete); + } + } + + hTable.close(); + } +} \ No newline at end of file diff --git a/src/test/java/dds_unit_test_db.sql b/src/test/java/dds_unit_test_db.sql new file mode 100644 index 00000000..0c418684 --- /dev/null +++ b/src/test/java/dds_unit_test_db.sql @@ -0,0 +1,786 @@ +create database if not exists group_00; +create database if not exists group_01; +create database if not exists group_02; +create database if not exists group_03; +create database if not exists group_04; +create database if not exists group_05; +create database if not exists group_06; +create database if not exists group_07; +create database if not exists group_08; +create database if not exists group_09; + +create tablegroup test SHARDING = 'ADAPTIVE'; + +create table if not exists group_00.test$family_00( + K varbinary(1024) NOT NULL, + Q varbinary(256) NOT NULL, + T bigint(20) NOT NULL, + V varbinary(1024) NOT NULL, + PRIMARY KEY (K, Q, T) +) TABLEGROUP = 'test'; +create table if not exists group_00.test$family_01( + K varbinary(1024) NOT NULL, + Q varbinary(256) NOT NULL, + T bigint(20) NOT NULL, + V varbinary(1024) NOT NULL, + PRIMARY KEY (K, Q, T) +) TABLEGROUP = 'test'; + +create table if not exists group_00.test$family_02( + K varbinary(1024) NOT NULL, + Q varbinary(256) NOT NULL, + T bigint(20) NOT NULL, + V varbinary(1024) NOT NULL, + PRIMARY KEY (K, Q, T) +) TABLEGROUP = 'test'; + +create table if not exists group_00.test$family_03( + K varbinary(1024) NOT NULL, + Q varbinary(256) NOT NULL, + T bigint(20) NOT NULL, + V varbinary(1024) NOT NULL, + PRIMARY KEY (K, Q, T) +) TABLEGROUP = 'test'; + +create table if not exists group_00.test$family_04( + K varbinary(1024) NOT NULL, + Q varbinary(256) NOT NULL, + T bigint(20) NOT NULL, + V varbinary(1024) NOT NULL, + PRIMARY KEY (K, Q, T) +) TABLEGROUP = 'test'; +create table if not exists group_00.test$family_05( + K varbinary(1024) NOT NULL, + Q varbinary(256) NOT NULL, + T bigint(20) NOT NULL, + V varbinary(1024) NOT NULL, + PRIMARY KEY (K, Q, T) +) TABLEGROUP = 'test'; + +create table if not exists group_00.test$family_06( + K varbinary(1024) NOT NULL, + Q varbinary(256) NOT NULL, + T bigint(20) NOT NULL, + V varbinary(1024) NOT NULL, + PRIMARY KEY (K, Q, T) +) TABLEGROUP = 'test'; + +create table if not exists group_00.test$family_07( + K varbinary(1024) NOT NULL, + Q varbinary(256) NOT NULL, + T bigint(20) NOT NULL, + V varbinary(1024) NOT NULL, + PRIMARY KEY (K, Q, T) +) TABLEGROUP = 'test'; + +create table if not exists group_00.test$family_08( + K varbinary(1024) NOT NULL, + Q varbinary(256) NOT NULL, + T bigint(20) NOT NULL, + V varbinary(1024) NOT NULL, + PRIMARY KEY (K, Q, T) +) TABLEGROUP = 'test'; +create table if not exists group_00.test$family_09( + K varbinary(1024) NOT NULL, + Q varbinary(256) NOT NULL, + T bigint(20) NOT NULL, + V varbinary(1024) NOT NULL, + PRIMARY KEY (K, Q, T) +) TABLEGROUP = 'test'; + + + +create table if not exists group_01.test$family_00( + K varbinary(1024) NOT NULL, + Q varbinary(256) NOT NULL, + T bigint(20) NOT NULL, + V varbinary(1024) NOT NULL, + PRIMARY KEY (K, Q, T) +) TABLEGROUP = 'test'; +create table if not exists group_01.test$family_01( + K varbinary(1024) NOT NULL, + Q varbinary(256) NOT NULL, + T bigint(20) NOT NULL, + V varbinary(1024) NOT NULL, + PRIMARY KEY (K, Q, T) +) TABLEGROUP = 'test'; + +create table if not exists group_01.test$family_02( + K varbinary(1024) NOT NULL, + Q varbinary(256) NOT NULL, + T bigint(20) NOT NULL, + V varbinary(1024) NOT NULL, + PRIMARY KEY (K, Q, T) +) TABLEGROUP = 'test'; + +create table if not exists group_01.test$family_03( + K varbinary(1024) NOT NULL, + Q varbinary(256) NOT NULL, + T bigint(20) NOT NULL, + V varbinary(1024) NOT NULL, + PRIMARY KEY (K, Q, T) +) TABLEGROUP = 'test'; + +create table if not exists group_01.test$family_04( + K varbinary(1024) NOT NULL, + Q varbinary(256) NOT NULL, + T bigint(20) NOT NULL, + V varbinary(1024) NOT NULL, + PRIMARY KEY (K, Q, T) +) TABLEGROUP = 'test'; +create table if not exists group_01.test$family_05( + K varbinary(1024) NOT NULL, + Q varbinary(256) NOT NULL, + T bigint(20) NOT NULL, + V varbinary(1024) NOT NULL, + PRIMARY KEY (K, Q, T) +) TABLEGROUP = 'test'; + +create table if not exists group_01.test$family_06( + K varbinary(1024) NOT NULL, + Q varbinary(256) NOT NULL, + T bigint(20) NOT NULL, + V varbinary(1024) NOT NULL, + PRIMARY KEY (K, Q, T) +) TABLEGROUP = 'test'; + +create table if not exists group_01.test$family_07( + K varbinary(1024) NOT NULL, + Q varbinary(256) NOT NULL, + T bigint(20) NOT NULL, + V varbinary(1024) NOT NULL, + PRIMARY KEY (K, Q, T) +) TABLEGROUP = 'test'; + +create table if not exists group_01.test$family_08( + K varbinary(1024) NOT NULL, + Q varbinary(256) NOT NULL, + T bigint(20) NOT NULL, + V varbinary(1024) NOT NULL, + PRIMARY KEY (K, Q, T) +) TABLEGROUP = 'test'; +create table if not exists group_01.test$family_09( + K varbinary(1024) NOT NULL, + Q varbinary(256) NOT NULL, + T bigint(20) NOT NULL, + V varbinary(1024) NOT NULL, + PRIMARY KEY (K, Q, T) +) TABLEGROUP = 'test'; + +create table if not exists group_02.test$family_00( + K varbinary(1024) NOT NULL, + Q varbinary(256) NOT NULL, + T bigint(20) NOT NULL, + V varbinary(1024) NOT NULL, + PRIMARY KEY (K, Q, T) +) TABLEGROUP = 'test'; + +create table if not exists group_02.test$family_01( + K varbinary(1024) NOT NULL, + Q varbinary(256) NOT NULL, + T bigint(20) NOT NULL, + V varbinary(1024) NOT NULL, + PRIMARY KEY (K, Q, T) +) TABLEGROUP = 'test'; + +create table if not exists group_02.test$family_02( + K varbinary(1024) NOT NULL, + Q varbinary(256) NOT NULL, + T bigint(20) NOT NULL, + V varbinary(1024) NOT NULL, + PRIMARY KEY (K, Q, T) +) TABLEGROUP = 'test'; + +create table if not exists group_02.test$family_03( + K varbinary(1024) NOT NULL, + Q varbinary(256) NOT NULL, + T bigint(20) NOT NULL, + V varbinary(1024) NOT NULL, + PRIMARY KEY (K, Q, T) +) TABLEGROUP = 'test'; + +create table if not exists group_02.test$family_04( + K varbinary(1024) NOT NULL, + Q varbinary(256) NOT NULL, + T bigint(20) NOT NULL, + V varbinary(1024) NOT NULL, + PRIMARY KEY (K, Q, T) +) TABLEGROUP = 'test'; + +create table if not exists group_02.test$family_05( + K varbinary(1024) NOT NULL, + Q varbinary(256) NOT NULL, + T bigint(20) NOT NULL, + V varbinary(1024) NOT NULL, + PRIMARY KEY (K, Q, T) +) TABLEGROUP = 'test'; + +create table if not exists group_02.test$family_06( + K varbinary(1024) NOT NULL, + Q varbinary(256) NOT NULL, + T bigint(20) NOT NULL, + V varbinary(1024) NOT NULL, + PRIMARY KEY (K, Q, T) +) TABLEGROUP = 'test'; + +create table if not exists group_02.test$family_07( + K varbinary(1024) NOT NULL, + Q varbinary(256) NOT NULL, + T bigint(20) NOT NULL, + V varbinary(1024) NOT NULL, + PRIMARY KEY (K, Q, T) +) TABLEGROUP = 'test'; + +create table if not exists group_02.test$family_08( + K varbinary(1024) NOT NULL, + Q varbinary(256) NOT NULL, + T bigint(20) NOT NULL, + V varbinary(1024) NOT NULL, + PRIMARY KEY (K, Q, T) +) TABLEGROUP = 'test'; + +create table if not exists group_02.test$family_09( + K varbinary(1024) NOT NULL, + Q varbinary(256) NOT NULL, + T bigint(20) NOT NULL, + V varbinary(1024) NOT NULL, + PRIMARY KEY (K, Q, T) +) TABLEGROUP = 'test'; + +create table if not exists group_03.test$family_00( + K varbinary(1024) NOT NULL, + Q varbinary(256) NOT NULL, + T bigint(20) NOT NULL, + V varbinary(1024) NOT NULL, + PRIMARY KEY (K, Q, T) +) TABLEGROUP = 'test'; +create table if not exists group_03.test$family_01( + K varbinary(1024) NOT NULL, + Q varbinary(256) NOT NULL, + T bigint(20) NOT NULL, + V varbinary(1024) NOT NULL, + PRIMARY KEY (K, Q, T) +) TABLEGROUP = 'test'; +create table if not exists group_03.test$family_02( + K varbinary(1024) NOT NULL, + Q varbinary(256) NOT NULL, + T bigint(20) NOT NULL, + V varbinary(1024) NOT NULL, + PRIMARY KEY (K, Q, T) +) TABLEGROUP = 'test'; +create table if not exists group_03.test$family_03( + K varbinary(1024) NOT NULL, + Q varbinary(256) NOT NULL, + T bigint(20) NOT NULL, + V varbinary(1024) NOT NULL, + PRIMARY KEY (K, Q, T) +) TABLEGROUP = 'test'; +create table if not exists group_03.test$family_04( + K varbinary(1024) NOT NULL, + Q varbinary(256) NOT NULL, + T bigint(20) NOT NULL, + V varbinary(1024) NOT NULL, + PRIMARY KEY (K, Q, T) +) TABLEGROUP = 'test'; + +create table if not exists group_03.test$family_05( + K varbinary(1024) NOT NULL, + Q varbinary(256) NOT NULL, + T bigint(20) NOT NULL, + V varbinary(1024) NOT NULL, + PRIMARY KEY (K, Q, T) +) TABLEGROUP = 'test'; + +create table if not exists group_03.test$family_06( + K varbinary(1024) NOT NULL, + Q varbinary(256) NOT NULL, + T bigint(20) NOT NULL, + V varbinary(1024) NOT NULL, + PRIMARY KEY (K, Q, T) +) TABLEGROUP = 'test'; + +create table if not exists group_03.test$family_07( + K varbinary(1024) NOT NULL, + Q varbinary(256) NOT NULL, + T bigint(20) NOT NULL, + V varbinary(1024) NOT NULL, + PRIMARY KEY (K, Q, T) +) TABLEGROUP = 'test'; + +create table if not exists group_03.test$family_08( + K varbinary(1024) NOT NULL, + Q varbinary(256) NOT NULL, + T bigint(20) NOT NULL, + V varbinary(1024) NOT NULL, + PRIMARY KEY (K, Q, T) +) TABLEGROUP = 'test'; + +create table if not exists group_03.test$family_09( + K varbinary(1024) NOT NULL, + Q varbinary(256) NOT NULL, + T bigint(20) NOT NULL, + V varbinary(1024) NOT NULL, + PRIMARY KEY (K, Q, T) +) TABLEGROUP = 'test'; + +create table if not exists group_04.test$family_00( + K varbinary(1024) NOT NULL, + Q varbinary(256) NOT NULL, + T bigint(20) NOT NULL, + V varbinary(1024) NOT NULL, + PRIMARY KEY (K, Q, T) +) TABLEGROUP = 'test'; +create table if not exists group_04.test$family_01( + K varbinary(1024) NOT NULL, + Q varbinary(256) NOT NULL, + T bigint(20) NOT NULL, + V varbinary(1024) NOT NULL, + PRIMARY KEY (K, Q, T) +) TABLEGROUP = 'test'; + +create table if not exists group_04.test$family_02( + K varbinary(1024) NOT NULL, + Q varbinary(256) NOT NULL, + T bigint(20) NOT NULL, + V varbinary(1024) NOT NULL, + PRIMARY KEY (K, Q, T) +) TABLEGROUP = 'test'; + +create table if not exists group_04.test$family_03( + K varbinary(1024) NOT NULL, + Q varbinary(256) NOT NULL, + T bigint(20) NOT NULL, + V varbinary(1024) NOT NULL, + PRIMARY KEY (K, Q, T) +) TABLEGROUP = 'test'; + +create table if not exists group_04.test$family_04( + K varbinary(1024) NOT NULL, + Q varbinary(256) NOT NULL, + T bigint(20) NOT NULL, + V varbinary(1024) NOT NULL, + PRIMARY KEY (K, Q, T) +) TABLEGROUP = 'test'; +create table if not exists group_04.test$family_05( + K varbinary(1024) NOT NULL, + Q varbinary(256) NOT NULL, + T bigint(20) NOT NULL, + V varbinary(1024) NOT NULL, + PRIMARY KEY (K, Q, T) +) TABLEGROUP = 'test'; + +create table if not exists group_04.test$family_06( + K varbinary(1024) NOT NULL, + Q varbinary(256) NOT NULL, + T bigint(20) NOT NULL, + V varbinary(1024) NOT NULL, + PRIMARY KEY (K, Q, T) +) TABLEGROUP = 'test'; + +create table if not exists group_04.test$family_07( + K varbinary(1024) NOT NULL, + Q varbinary(256) NOT NULL, + T bigint(20) NOT NULL, + V varbinary(1024) NOT NULL, + PRIMARY KEY (K, Q, T) +) TABLEGROUP = 'test'; + +create table if not exists group_04.test$family_08( + K varbinary(1024) NOT NULL, + Q varbinary(256) NOT NULL, + T bigint(20) NOT NULL, + V varbinary(1024) NOT NULL, + PRIMARY KEY (K, Q, T) +) TABLEGROUP = 'test'; +create table if not exists group_04.test$family_09( + K varbinary(1024) NOT NULL, + Q varbinary(256) NOT NULL, + T bigint(20) NOT NULL, + V varbinary(1024) NOT NULL, + PRIMARY KEY (K, Q, T) +) TABLEGROUP = 'test'; + +create table if not exists group_05.test$family_00( + K varbinary(1024) NOT NULL, + Q varbinary(256) NOT NULL, + T bigint(20) NOT NULL, + V varbinary(1024) NOT NULL, + PRIMARY KEY (K, Q, T) +) TABLEGROUP = 'test'; +create table if not exists group_05.test$family_01( + K varbinary(1024) NOT NULL, + Q varbinary(256) NOT NULL, + T bigint(20) NOT NULL, + V varbinary(1024) NOT NULL, + PRIMARY KEY (K, Q, T) +) TABLEGROUP = 'test'; + +create table if not exists group_05.test$family_02( + K varbinary(1024) NOT NULL, + Q varbinary(256) NOT NULL, + T bigint(20) NOT NULL, + V varbinary(1024) NOT NULL, + PRIMARY KEY (K, Q, T) +) TABLEGROUP = 'test'; + +create table if not exists group_05.test$family_03( + K varbinary(1024) NOT NULL, + Q varbinary(256) NOT NULL, + T bigint(20) NOT NULL, + V varbinary(1024) NOT NULL, + PRIMARY KEY (K, Q, T) +) TABLEGROUP = 'test'; + +create table if not exists group_05.test$family_04( + K varbinary(1024) NOT NULL, + Q varbinary(256) NOT NULL, + T bigint(20) NOT NULL, + V varbinary(1024) NOT NULL, + PRIMARY KEY (K, Q, T) +) TABLEGROUP = 'test'; +create table if not exists group_05.test$family_05( + K varbinary(1024) NOT NULL, + Q varbinary(256) NOT NULL, + T bigint(20) NOT NULL, + V varbinary(1024) NOT NULL, + PRIMARY KEY (K, Q, T) +) TABLEGROUP = 'test'; + +create table if not exists group_05.test$family_06( + K varbinary(1024) NOT NULL, + Q varbinary(256) NOT NULL, + T bigint(20) NOT NULL, + V varbinary(1024) NOT NULL, + PRIMARY KEY (K, Q, T) +) TABLEGROUP = 'test'; + +create table if not exists group_05.test$family_07( + K varbinary(1024) NOT NULL, + Q varbinary(256) NOT NULL, + T bigint(20) NOT NULL, + V varbinary(1024) NOT NULL, + PRIMARY KEY (K, Q, T) +) TABLEGROUP = 'test'; + +create table if not exists group_05.test$family_08( + K varbinary(1024) NOT NULL, + Q varbinary(256) NOT NULL, + T bigint(20) NOT NULL, + V varbinary(1024) NOT NULL, + PRIMARY KEY (K, Q, T) +) TABLEGROUP = 'test'; +create table if not exists group_05.test$family_09( + K varbinary(1024) NOT NULL, + Q varbinary(256) NOT NULL, + T bigint(20) NOT NULL, + V varbinary(1024) NOT NULL, + PRIMARY KEY (K, Q, T) +) TABLEGROUP = 'test'; + +create table if not exists group_06.test$family_00( + K varbinary(1024) NOT NULL, + Q varbinary(256) NOT NULL, + T bigint(20) NOT NULL, + V varbinary(1024) NOT NULL, + PRIMARY KEY (K, Q, T) +) TABLEGROUP = 'test'; +create table if not exists group_06.test$family_01( + K varbinary(1024) NOT NULL, + Q varbinary(256) NOT NULL, + T bigint(20) NOT NULL, + V varbinary(1024) NOT NULL, + PRIMARY KEY (K, Q, T) +) TABLEGROUP = 'test'; + +create table if not exists group_06.test$family_02( + K varbinary(1024) NOT NULL, + Q varbinary(256) NOT NULL, + T bigint(20) NOT NULL, + V varbinary(1024) NOT NULL, + PRIMARY KEY (K, Q, T) +) TABLEGROUP = 'test'; + +create table if not exists group_06.test$family_03( + K varbinary(1024) NOT NULL, + Q varbinary(256) NOT NULL, + T bigint(20) NOT NULL, + V varbinary(1024) NOT NULL, + PRIMARY KEY (K, Q, T) +) TABLEGROUP = 'test'; + +create table if not exists group_06.test$family_04( + K varbinary(1024) NOT NULL, + Q varbinary(256) NOT NULL, + T bigint(20) NOT NULL, + V varbinary(1024) NOT NULL, + PRIMARY KEY (K, Q, T) +) TABLEGROUP = 'test'; +create table if not exists group_06.test$family_05( + K varbinary(1024) NOT NULL, + Q varbinary(256) NOT NULL, + T bigint(20) NOT NULL, + V varbinary(1024) NOT NULL, + PRIMARY KEY (K, Q, T) +) TABLEGROUP = 'test'; + +create table if not exists group_06.test$family_06( + K varbinary(1024) NOT NULL, + Q varbinary(256) NOT NULL, + T bigint(20) NOT NULL, + V varbinary(1024) NOT NULL, + PRIMARY KEY (K, Q, T) +) TABLEGROUP = 'test'; + +create table if not exists group_06.test$family_07( + K varbinary(1024) NOT NULL, + Q varbinary(256) NOT NULL, + T bigint(20) NOT NULL, + V varbinary(1024) NOT NULL, + PRIMARY KEY (K, Q, T) +) TABLEGROUP = 'test'; + +create table if not exists group_06.test$family_08( + K varbinary(1024) NOT NULL, + Q varbinary(256) NOT NULL, + T bigint(20) NOT NULL, + V varbinary(1024) NOT NULL, + PRIMARY KEY (K, Q, T) +) TABLEGROUP = 'test'; +create table if not exists group_06.test$family_09( + K varbinary(1024) NOT NULL, + Q varbinary(256) NOT NULL, + T bigint(20) NOT NULL, + V varbinary(1024) NOT NULL, + PRIMARY KEY (K, Q, T) +) TABLEGROUP = 'test'; + +create table if not exists group_07.test$family_00( + K varbinary(1024) NOT NULL, + Q varbinary(256) NOT NULL, + T bigint(20) NOT NULL, + V varbinary(1024) NOT NULL, + PRIMARY KEY (K, Q, T) +) TABLEGROUP = 'test'; +create table if not exists group_07.test$family_01( + K varbinary(1024) NOT NULL, + Q varbinary(256) NOT NULL, + T bigint(20) NOT NULL, + V varbinary(1024) NOT NULL, + PRIMARY KEY (K, Q, T) +) TABLEGROUP = 'test'; + +create table if not exists group_07.test$family_02( + K varbinary(1024) NOT NULL, + Q varbinary(256) NOT NULL, + T bigint(20) NOT NULL, + V varbinary(1024) NOT NULL, + PRIMARY KEY (K, Q, T) +) TABLEGROUP = 'test'; + +create table if not exists group_07.test$family_03( + K varbinary(1024) NOT NULL, + Q varbinary(256) NOT NULL, + T bigint(20) NOT NULL, + V varbinary(1024) NOT NULL, + PRIMARY KEY (K, Q, T) +) TABLEGROUP = 'test'; + +create table if not exists group_07.test$family_04( + K varbinary(1024) NOT NULL, + Q varbinary(256) NOT NULL, + T bigint(20) NOT NULL, + V varbinary(1024) NOT NULL, + PRIMARY KEY (K, Q, T) +) TABLEGROUP = 'test'; +create table if not exists group_07.test$family_05( + K varbinary(1024) NOT NULL, + Q varbinary(256) NOT NULL, + T bigint(20) NOT NULL, + V varbinary(1024) NOT NULL, + PRIMARY KEY (K, Q, T) +) TABLEGROUP = 'test'; + +create table if not exists group_07.test$family_06( + K varbinary(1024) NOT NULL, + Q varbinary(256) NOT NULL, + T bigint(20) NOT NULL, + V varbinary(1024) NOT NULL, + PRIMARY KEY (K, Q, T) +) TABLEGROUP = 'test'; + +create table if not exists group_07.test$family_07( + K varbinary(1024) NOT NULL, + Q varbinary(256) NOT NULL, + T bigint(20) NOT NULL, + V varbinary(1024) NOT NULL, + PRIMARY KEY (K, Q, T) +) TABLEGROUP = 'test'; + +create table if not exists group_07.test$family_08( + K varbinary(1024) NOT NULL, + Q varbinary(256) NOT NULL, + T bigint(20) NOT NULL, + V varbinary(1024) NOT NULL, + PRIMARY KEY (K, Q, T) +) TABLEGROUP = 'test'; +create table if not exists group_07.test$family_09( + K varbinary(1024) NOT NULL, + Q varbinary(256) NOT NULL, + T bigint(20) NOT NULL, + V varbinary(1024) NOT NULL, + PRIMARY KEY (K, Q, T) +) TABLEGROUP = 'test'; + +create table if not exists group_08.test$family_00( + K varbinary(1024) NOT NULL, + Q varbinary(256) NOT NULL, + T bigint(20) NOT NULL, + V varbinary(1024) NOT NULL, + PRIMARY KEY (K, Q, T) +) TABLEGROUP = 'test'; +create table if not exists group_08.test$family_01( + K varbinary(1024) NOT NULL, + Q varbinary(256) NOT NULL, + T bigint(20) NOT NULL, + V varbinary(1024) NOT NULL, + PRIMARY KEY (K, Q, T) +) TABLEGROUP = 'test'; + +create table if not exists group_08.test$family_02( + K varbinary(1024) NOT NULL, + Q varbinary(256) NOT NULL, + T bigint(20) NOT NULL, + V varbinary(1024) NOT NULL, + PRIMARY KEY (K, Q, T) +) TABLEGROUP = 'test'; + +create table if not exists group_08.test$family_03( + K varbinary(1024) NOT NULL, + Q varbinary(256) NOT NULL, + T bigint(20) NOT NULL, + V varbinary(1024) NOT NULL, + PRIMARY KEY (K, Q, T) +) TABLEGROUP = 'test'; + +create table if not exists group_08.test$family_04( + K varbinary(1024) NOT NULL, + Q varbinary(256) NOT NULL, + T bigint(20) NOT NULL, + V varbinary(1024) NOT NULL, + PRIMARY KEY (K, Q, T) +) TABLEGROUP = 'test'; +create table if not exists group_08.test$family_05( + K varbinary(1024) NOT NULL, + Q varbinary(256) NOT NULL, + T bigint(20) NOT NULL, + V varbinary(1024) NOT NULL, + PRIMARY KEY (K, Q, T) +) TABLEGROUP = 'test'; + +create table if not exists group_08.test$family_06( + K varbinary(1024) NOT NULL, + Q varbinary(256) NOT NULL, + T bigint(20) NOT NULL, + V varbinary(1024) NOT NULL, + PRIMARY KEY (K, Q, T) +) TABLEGROUP = 'test'; + +create table if not exists group_08.test$family_07( + K varbinary(1024) NOT NULL, + Q varbinary(256) NOT NULL, + T bigint(20) NOT NULL, + V varbinary(1024) NOT NULL, + PRIMARY KEY (K, Q, T) +) TABLEGROUP = 'test'; + +create table if not exists group_08.test$family_08( + K varbinary(1024) NOT NULL, + Q varbinary(256) NOT NULL, + T bigint(20) NOT NULL, + V varbinary(1024) NOT NULL, + PRIMARY KEY (K, Q, T) +) TABLEGROUP = 'test'; +create table if not exists group_08.test$family_09( + K varbinary(1024) NOT NULL, + Q varbinary(256) NOT NULL, + T bigint(20) NOT NULL, + V varbinary(1024) NOT NULL, + PRIMARY KEY (K, Q, T) +) TABLEGROUP = 'test'; + +create table if not exists group_09.test$family_00( + K varbinary(1024) NOT NULL, + Q varbinary(256) NOT NULL, + T bigint(20) NOT NULL, + V varbinary(1024) NOT NULL, + PRIMARY KEY (K, Q, T) +) TABLEGROUP = 'test'; +create table if not exists group_09.test$family_01( + K varbinary(1024) NOT NULL, + Q varbinary(256) NOT NULL, + T bigint(20) NOT NULL, + V varbinary(1024) NOT NULL, + PRIMARY KEY (K, Q, T) +) TABLEGROUP = 'test'; + +create table if not exists group_09.test$family_02( + K varbinary(1024) NOT NULL, + Q varbinary(256) NOT NULL, + T bigint(20) NOT NULL, + V varbinary(1024) NOT NULL, + PRIMARY KEY (K, Q, T) +) TABLEGROUP = 'test'; + +create table if not exists group_09.test$family_03( + K varbinary(1024) NOT NULL, + Q varbinary(256) NOT NULL, + T bigint(20) NOT NULL, + V varbinary(1024) NOT NULL, + PRIMARY KEY (K, Q, T) +) TABLEGROUP = 'test'; + +create table if not exists group_09.test$family_04( + K varbinary(1024) NOT NULL, + Q varbinary(256) NOT NULL, + T bigint(20) NOT NULL, + V varbinary(1024) NOT NULL, + PRIMARY KEY (K, Q, T) +) TABLEGROUP = 'test'; +create table if not exists group_09.test$family_05( + K varbinary(1024) NOT NULL, + Q varbinary(256) NOT NULL, + T bigint(20) NOT NULL, + V varbinary(1024) NOT NULL, + PRIMARY KEY (K, Q, T) +) TABLEGROUP = 'test'; + +create table if not exists group_09.test$family_06( + K varbinary(1024) NOT NULL, + Q varbinary(256) NOT NULL, + T bigint(20) NOT NULL, + V varbinary(1024) NOT NULL, + PRIMARY KEY (K, Q, T) +) TABLEGROUP = 'test'; + +create table if not exists group_09.test$family_07( + K varbinary(1024) NOT NULL, + Q varbinary(256) NOT NULL, + T bigint(20) NOT NULL, + V varbinary(1024) NOT NULL, + PRIMARY KEY (K, Q, T) +) TABLEGROUP = 'test'; + +create table if not exists group_09.test$family_08( + K varbinary(1024) NOT NULL, + Q varbinary(256) NOT NULL, + T bigint(20) NOT NULL, + V varbinary(1024) NOT NULL, + PRIMARY KEY (K, Q, T) +) TABLEGROUP = 'test'; +create table if not exists group_09.test$family_09( + K varbinary(1024) NOT NULL, + Q varbinary(256) NOT NULL, + T bigint(20) NOT NULL, + V varbinary(1024) NOT NULL, + PRIMARY KEY (K, Q, T) +) TABLEGROUP = 'test';