Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,16 @@
import com.alipay.oceanbase.hbase.util.ObHTableTestUtil;
import com.alipay.oceanbase.hbase.util.TableTemplateManager;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.*;

import java.util.*;

import static com.alipay.oceanbase.hbase.util.ObHTableSecondaryPartUtil.*;
import static com.alipay.oceanbase.hbase.util.ObHTableTestUtil.FOR_EACH;
import static com.alipay.oceanbase.hbase.util.TableTemplateManager.TIMESERIES_TABLES;
import static com.alipay.oceanbase.hbase.util.TableTemplateManager.TableType.SECONDARY_PARTITIONED_TIME_RANGE_KEY;
import static org.apache.hadoop.hbase.util.Bytes.toBytes;
import static org.junit.Assert.assertEquals;

Expand Down Expand Up @@ -215,4 +216,220 @@ public static void testScanImpl(String tableName) throws Exception {
public void testScan() throws Throwable {
FOR_EACH(tableNames, OHTableTimeSeriesScanTest::testScanImpl);
}

@Test
public void testScanWithLimit() throws Throwable {
FOR_EACH(tableNames, OHTableTimeSeriesScanTest::testScanWithLimit);
}

public static void testScanWithLimit(String tableName) throws Exception {
OHTableClient hTable = ObHTableTestUtil.newOHTableClient(getTableName(tableName));
hTable.init();

// 准备测试数据
String family = getColumnFamilyName(tableName);
long ts = System.currentTimeMillis();

// 准备更多的测试数据:5个key,每个key有3个列,每个列有4个版本
String keys[] = {"limitKey1", "limitKey2", "limitKey3", "limitKey4", "limitKey5"};
String columns[] = {"limitCol1", "limitCol2", "limitCol3"};
String values[] = {"limitVal1", "limitVal2", "limitVal3", "limitVal4"};
long tss[] = {ts, ts + 1, ts + 2, ts + 3};

// 写入测试数据
for (String key : keys) {
Put put = new Put(toBytes(key));
for (String column : columns) {
for (int i = 0; i < values.length; i++) {
put.addColumn(family.getBytes(), column.getBytes(), tss[i], values[i].getBytes());
}
}
hTable.put(put);
}

// 1. 测试setLimit 小于实际数据量的情况
{
Scan scan = new Scan(keys[0].getBytes(), "limitKey6".getBytes());
scan.addFamily(family.getBytes());
scan.setLimit(3); // 限制返回3个完整行
ResultScanner scanner = hTable.getScanner(scan);
List<Result> results = new ArrayList<>();
for (Result result : scanner) {
results.add(result);
}
// 预期:返回3个完整行
Assert.assertTrue("返回行数应该不超过limit", results.size() == 3);

// 校验每个返回行的数据正确性
for (int i = 0; i < results.size(); i++) {
Result result = results.get(i);
String expectedKey = keys[i]; // 按顺序应该是limitKey1, limitKey2, limitKey3

// 校验行键
Assert.assertEquals("行键应该匹配", expectedKey, Bytes.toString(result.getRow()));

// 校验每个行包含的列数据
Cell[] cells = result.rawCells();
Assert.assertEquals("每行应该包含" + (columns.length * values.length) + "个cell",
columns.length * values.length, cells.length);

// 按列名和时间戳排序校验
sortCells(cells);
int cellIdx = 0;
for (String column : columns) {
for (int j = values.length - 1; j >= 0; j--) { // 按时间戳降序
AssertKeyValue(expectedKey, column, tss[j], values[j], cells[cellIdx++]);
}
}
}
}

// 2. 测试setLimit 大于实际数据量的情况
{
Scan scan = new Scan(keys[0].getBytes(), "limitKey6".getBytes());
scan.addFamily(family.getBytes());
scan.setLimit(10); // 限制返回10个完整行(大于实际数据量5行)
ResultScanner scanner = hTable.getScanner(scan);
List<Result> results = new ArrayList<>();
for (Result result : scanner) {
results.add(result);
}
// 预期:返回所有5个完整行
Assert.assertEquals("应该返回所有数据", keys.length , results.size());

// 校验每个返回行的数据正确性
for (int i = 0; i < results.size(); i++) {
Result result = results.get(i);
String expectedKey = keys[i]; // 按顺序应该是limitKey1到limitKey5

// 校验行键
Assert.assertEquals("行键应该匹配", expectedKey, Bytes.toString(result.getRow()));

// 校验每个行包含的列数据
Cell[] cells = result.rawCells();
Assert.assertEquals("每行应该包含" + (columns.length * values.length) + "个cell",
columns.length * values.length, cells.length);

// 按列名和时间戳排序校验
sortCells(cells);
int cellIdx = 0;
for (String column : columns) {
for (int j = values.length - 1; j >= 0; j--) { // 按时间戳降序
AssertKeyValue(expectedKey, column, tss[j], values[j], cells[cellIdx++]);
}
}
}
}

// 3. 测试setLimit 为1的情况
{
Scan scan = new Scan(keys[0].getBytes(), "limitKey6".getBytes());
scan.addFamily(family.getBytes());
scan.setLimit(1); // 限制返回1个完整行
ResultScanner scanner = hTable.getScanner(scan);
List<Result> results = new ArrayList<>();
for (Result result : scanner) {
results.add(result);
}
// 预期:只返回1个完整行
Assert.assertEquals("应该只返回1个完整行", 1, results.size());

// 校验返回的唯一行的数据正确性
Result result = results.get(0);
String expectedKey = keys[0]; // 应该是limitKey1

// 校验行键
Assert.assertEquals("行键应该匹配", expectedKey, Bytes.toString(result.getRow()));

// 校验该行包含的列数据
Cell[] cells = result.rawCells();
Assert.assertEquals("该行应该包含" + (columns.length * values.length) + "个cell",
columns.length * values.length, cells.length);

// 按列名和时间戳排序校验
sortCells(cells);
int cellIdx = 0;
for (String column : columns) {
for (int j = values.length - 1; j >= 0; j--) { // 按时间戳降序
AssertKeyValue(expectedKey, column, tss[j], values[j], cells[cellIdx++]);
}
}
}

// 4. 测试指定单个列的setLimit
{
Scan scan = new Scan(keys[0].getBytes(), "limitKey6".getBytes());
scan.addColumn(family.getBytes(), columns[0].getBytes());
scan.setLimit(3); // 限制返回3个完整行
ResultScanner scanner = hTable.getScanner(scan);
List<Result> results = new ArrayList<>();
for (Result result : scanner) {
results.add(result);
}
// 预期:返回3个完整行
Assert.assertTrue("返回行数应该不超过limit", results.size() == 3);

// 校验每个返回行的单列数据正确性
for (int i = 0; i < results.size(); i++) {
Result result = results.get(i);
String expectedKey = keys[i]; // 按顺序应该是limitKey1, limitKey2, limitKey3

// 校验行键
Assert.assertEquals("行键应该匹配", expectedKey, Bytes.toString(result.getRow()));

// 校验该行只包含指定列的数据
Cell[] cells = result.rawCells();
Assert.assertEquals("该行应该只包含" + values.length + "个cell(单列多版本)",
values.length, cells.length);

// 按时间戳排序校验单列数据
sortCells(cells);
for (int j = 0; j < cells.length; j++) {
AssertKeyValue(expectedKey, columns[0], tss[values.length - 1 - j],
values[values.length - 1 - j], cells[j]);
}
}
}

// 5. 测试时间范围 + setLimit
{
Scan scan = new Scan(keys[0].getBytes(), "limitKey6".getBytes());
scan.addFamily(family.getBytes());
scan.setTimeStamp(tss[2]); // 只查询特定时间戳的数据
scan.setLimit(3); // 限制返回3个完整行
ResultScanner scanner = hTable.getScanner(scan);
List<Result> results = new ArrayList<>();
for (Result result : scanner) {
results.add(result);
}
// 预期:返回3个完整行,且都是指定时间戳的数据
Assert.assertTrue("返回行数应该不超过limit", results.size() == 3);

// 校验每个返回行的数据正确性
for (int i = 0; i < results.size(); i++) {
Result result = results.get(i);
String expectedKey = keys[i]; // 按顺序应该是limitKey1, limitKey2, limitKey3

// 校验行键
Assert.assertEquals("行键应该匹配", expectedKey, Bytes.toString(result.getRow()));

// 校验该行包含的列数据(只有指定时间戳的数据)
Cell[] cells = result.rawCells();
Assert.assertEquals("该行应该包含" + columns.length + "个cell(指定时间戳)",
columns.length, cells.length);

// 校验每个cell的时间戳和值
for (Cell cell : cells) {
Assert.assertEquals("时间戳应该匹配", tss[2], cell.getTimestamp());
// 校验列名和值
String columnName = Bytes.toString(CellUtil.cloneQualifier(cell));
Assert.assertTrue("列名应该在预期范围内",
Arrays.asList(columns).contains(columnName));
String value = Bytes.toString(CellUtil.cloneValue(cell));
Assert.assertEquals("值应该匹配", values[2], value);
}
}
}
System.out.println("testScanWithLimit run successfully");
}
}