diff --git a/src/main/java/com/alipay/oceanbase/hbase/OHTable.java b/src/main/java/com/alipay/oceanbase/hbase/OHTable.java index 35a9e087..ab434994 100644 --- a/src/main/java/com/alipay/oceanbase/hbase/OHTable.java +++ b/src/main/java/com/alipay/oceanbase/hbase/OHTable.java @@ -38,6 +38,7 @@ import com.alipay.oceanbase.rpc.stream.ObTableClientQueryAsyncStreamResult; import com.alipay.oceanbase.rpc.table.ObHBaseParams; import com.alipay.oceanbase.rpc.table.ObKVParams; +import com.alipay.oceanbase.rpc.util.ObBytesString; import com.alipay.sofa.common.thread.SofaThreadPoolExecutor; import com.google.protobuf.Descriptors; @@ -766,9 +767,6 @@ public Result call() throws IOException { // In a Get operation where the family map is greater than 1 or equal to 0, // we handle this by appending the column family to the qualifier on the client side. // The server can then use this information to filter the appropriate column families and qualifiers. - if (!get.getColumnFamilyTimeRange().isEmpty()) { - throw new FeatureNotSupportedException("setColumnFamilyTimeRange is only supported in single column family for now"); - } NavigableSet columnFilters = new TreeSet<>(Bytes.BYTES_COMPARATOR); processColumnFilters(columnFilters, get.getFamilyMap()); obTableQuery = buildObTableQuery(get, columnFilters); @@ -782,17 +780,6 @@ public Result call() throws IOException { for (Map.Entry> entry : get.getFamilyMap() .entrySet()) { family = entry.getKey(); - if (!get.getColumnFamilyTimeRange().isEmpty()) { - Map colFamTimeRangeMap = get.getColumnFamilyTimeRange(); - if (colFamTimeRangeMap.size() > 1) { - throw new FeatureNotSupportedException("setColumnFamilyTimeRange is only supported in single column family for now"); - } else if (colFamTimeRangeMap.get(family) == null) { - throw new IllegalArgumentException("Get family is not matched in ColumnFamilyTimeRange"); - } else { - TimeRange tr = colFamTimeRangeMap.get(family); - get.setTimeRange(tr.getMin(), tr.getMax()); - } - } obTableQuery = buildObTableQuery(get, entry.getValue()); request = buildObTableQueryAsyncRequest(obTableQuery, getTargetTableName(tableNameString, Bytes.toString(family), @@ -863,9 +850,6 @@ public ResultScanner call() throws IOException { // In a Scan operation where the family map is greater than 1 or equal to 0, // we handle this by appending the column family to the qualifier on the client side. // The server can then use this information to filter the appropriate column families and qualifiers. - if (!scan.getColumnFamilyTimeRange().isEmpty()) { - throw new FeatureNotSupportedException("setColumnFamilyTimeRange is only supported in single column family for now"); - } NavigableSet columnFilters = new TreeSet<>(Bytes.BYTES_COMPARATOR); processColumnFilters(columnFilters, scan.getFamilyMap()); filter = buildObHTableFilter(scan.getFilter(), scan.getTimeRange(), @@ -882,17 +866,6 @@ public ResultScanner call() throws IOException { for (Map.Entry> entry : scan.getFamilyMap() .entrySet()) { family = entry.getKey(); - if (!scan.getColumnFamilyTimeRange().isEmpty()) { - Map colFamTimeRangeMap = scan.getColumnFamilyTimeRange(); - if (colFamTimeRangeMap.size() > 1) { - throw new FeatureNotSupportedException("setColumnFamilyTimeRange is only supported in single column family for now"); - } else if (colFamTimeRangeMap.get(family) == null) { - throw new IllegalArgumentException("Scan family is not matched in ColumnFamilyTimeRange"); - } else { - TimeRange tr = colFamTimeRangeMap.get(family); - scan.setTimeRange(tr.getMin(), tr.getMax()); - } - } filter = buildObHTableFilter(scan.getFilter(), scan.getTimeRange(), scan.getMaxVersions(), entry.getValue()); obTableQuery = buildObTableQuery(filter, scan); @@ -994,6 +967,9 @@ private ObKVParams buildOBKVParams(final Scan scan) { obHBaseParams.setCallTimeout(scannerTimeout); obHBaseParams.setCacheBlock(scan.isGetScan()); obHBaseParams.setAllowPartialResults(scan.getAllowPartialResults()); + for (Map.Entry entry: scan.getColumnFamilyTimeRange().entrySet()) { + obHBaseParams.addFamilyTimeRange(new ObBytesString(entry.getKey()), entry.getValue().getMin(), entry.getValue().getMax()); + } } obKVParams.setObParamsBase(obHBaseParams); return obKVParams; @@ -1004,6 +980,9 @@ private ObKVParams buildOBKVParams(final Get get) { ObHBaseParams obHBaseParams = new ObHBaseParams(); obHBaseParams.setCheckExistenceOnly(get.isCheckExistenceOnly()); obHBaseParams.setCacheBlock(get.getCacheBlocks()); + for (Map.Entry entry: get.getColumnFamilyTimeRange().entrySet()) { + obHBaseParams.addFamilyTimeRange(new ObBytesString(entry.getKey()), entry.getValue().getMin(), entry.getValue().getMax()); + } obKVParams.setObParamsBase(obHBaseParams); return obKVParams; } diff --git a/src/test/java/com/alipay/oceanbase/hbase/HTableMultiCFTestBase.java b/src/test/java/com/alipay/oceanbase/hbase/HTableMultiCFTestBase.java index 89660ddb..0afe48c2 100644 --- a/src/test/java/com/alipay/oceanbase/hbase/HTableMultiCFTestBase.java +++ b/src/test/java/com/alipay/oceanbase/hbase/HTableMultiCFTestBase.java @@ -20,6 +20,8 @@ import org.apache.hadoop.conf.Configuration; import com.alipay.oceanbase.rpc.mutation.result.MutationResult; import com.alipay.oceanbase.hbase.util.ObHTableTestUtil; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.*; @@ -29,6 +31,7 @@ import org.junit.*; import org.junit.rules.ExpectedException; +import java.io.IOException; import java.util.*; import static org.apache.hadoop.hbase.util.Bytes.toBytes; @@ -1261,4 +1264,111 @@ public void testMultiColumnFamilyDelete() throws Exception { .getTimestamp()); assertTrue(lastTimestamp > oldTimestamp); } + + private void prepare_time_range_test_data() throws IOException { + byte[] family1 = "family_with_group1".getBytes(); + byte[] family2 = "family_with_group2".getBytes(); + byte[] family3 = "family_with_group3".getBytes(); + byte[] key = "KEY".getBytes(); + byte[] column = "COLUMN".getBytes(); + + Put put = new Put(key); + for (int i = 0; i < 60; i += 3) { + put.addColumn(family1, column, i, ("value" + i).getBytes()); + } + multiCfHTable.put(put); + put = new Put(key); + for (int i = 0; i < 60; i += 3) { + put.addColumn(family2, column, i, ("value" + i).getBytes()); + } + multiCfHTable.put(put); + put = new Put(key); + for (int i = 0; i < 60; i += 3) { + put.addColumn(family3, column, i, ("value" + i).getBytes()); + } + multiCfHTable.put(put); + } + + @Test + public void test_column_family_time_range() throws IOException { + byte[] family1 = "family_with_group1".getBytes(); + byte[] family2 = "family_with_group2".getBytes(); + byte[] family3 = "family_with_group3".getBytes(); + prepare_time_range_test_data(); + // 20 cell for every family. + + long min = 10; + long max = 20; + Get get = new Get(toBytes("KEY")); + get.setMaxVersions(999); + get.addFamily(family1); + get.setColumnFamilyTimeRange(family1, min, max); + Result result = multiCfHTable.get(get); + System.out.println("test1 start:"); + for (Cell keyValue : result.rawCells()) { + System.out + .printf( + "Rowkey: %s, Column Family: %s, Column Qualifier: %s, Timestamp: %d, Value: %s%n", + Bytes.toString(result.getRow()), + Bytes.toString(CellUtil.cloneFamily(keyValue)), + Bytes.toString(CellUtil.cloneQualifier(keyValue)), keyValue.getTimestamp(), + Bytes.toString(CellUtil.cloneValue(keyValue))); + assert(keyValue.getTimestamp() >= min && keyValue.getTimestamp() < max); + } + assertEquals(3, result.rawCells().length); + + get = new Get(toBytes("KEY")); + get.setMaxVersions(999); + get.setColumnFamilyTimeRange(family1, 0, 10); + get.setColumnFamilyTimeRange(family2, 0, 20); + get.setColumnFamilyTimeRange(family3, 0, 30); + result = multiCfHTable.get(get); + System.out.println("\ntest2 start:"); + for (Cell keyValue : result.rawCells()) { + System.out + .printf( + "Rowkey: %s, Column Family: %s, Column Qualifier: %s, Timestamp: %d, Value: %s%n", + Bytes.toString(result.getRow()), + Bytes.toString(CellUtil.cloneFamily(keyValue)), + Bytes.toString(CellUtil.cloneQualifier(keyValue)), keyValue.getTimestamp(), + Bytes.toString(CellUtil.cloneValue(keyValue))); + } + assertEquals(21, result.rawCells().length); + + get = new Get(toBytes("KEY")); + get.setMaxVersions(999); + get.setColumnFamilyTimeRange(family1, 0, 10); + get.setColumnFamilyTimeRange(family3, 0, 30); + get.setTimeRange(54, 60); + result = multiCfHTable.get(get); + System.out.println("\ntest3 start:"); + for (Cell keyValue : result.rawCells()) { + System.out + .printf( + "Rowkey: %s, Column Family: %s, Column Qualifier: %s, Timestamp: %d, Value: %s%n", + Bytes.toString(result.getRow()), + Bytes.toString(CellUtil.cloneFamily(keyValue)), + Bytes.toString(CellUtil.cloneQualifier(keyValue)), keyValue.getTimestamp(), + Bytes.toString(CellUtil.cloneValue(keyValue))); + } + assertEquals(16, result.rawCells().length); + + get = new Get(toBytes("KEY")); + get.setMaxVersions(6); + get.setColumnFamilyTimeRange(family1, 0, 10); + get.setColumnFamilyTimeRange(family3, 0, 30); + get.setTimeRange(54, 60); + result = multiCfHTable.get(get); + System.out.println("\ntest3 start:"); + for (Cell keyValue : result.rawCells()) { + System.out + .printf( + "Rowkey: %s, Column Family: %s, Column Qualifier: %s, Timestamp: %d, Value: %s%n", + Bytes.toString(result.getRow()), + Bytes.toString(CellUtil.cloneFamily(keyValue)), + Bytes.toString(CellUtil.cloneQualifier(keyValue)), keyValue.getTimestamp(), + Bytes.toString(CellUtil.cloneValue(keyValue))); + } + assertEquals(12, result.rawCells().length); + } }