From d32847616bc558d0b799735e16d1bfe769513125 Mon Sep 17 00:00:00 2001 From: miyuan-ljr Date: Wed, 30 Oct 2024 11:58:33 +0800 Subject: [PATCH 1/4] ClientAsyncStreamScanner --- .../com/alipay/oceanbase/hbase/OHTable.java | 19 +- .../result/ClientAsyncStreamScanner.java | 251 ++++++++++++++++++ .../hbase/result/ClientStreamScanner.java | 14 +- .../oceanbase/hbase/HTableTestBase.java | 51 ++++ 4 files changed, 324 insertions(+), 11 deletions(-) create mode 100644 src/main/java/com/alipay/oceanbase/hbase/result/ClientAsyncStreamScanner.java diff --git a/src/main/java/com/alipay/oceanbase/hbase/OHTable.java b/src/main/java/com/alipay/oceanbase/hbase/OHTable.java index 298d0828..32f577f7 100644 --- a/src/main/java/com/alipay/oceanbase/hbase/OHTable.java +++ b/src/main/java/com/alipay/oceanbase/hbase/OHTable.java @@ -21,6 +21,7 @@ import com.alipay.oceanbase.hbase.exception.OperationTimeoutException; import com.alipay.oceanbase.hbase.execute.ServerCallable; import com.alipay.oceanbase.hbase.filter.HBaseFilterUtils; +import com.alipay.oceanbase.hbase.result.ClientAsyncStreamScanner; import com.alipay.oceanbase.hbase.result.ClientStreamScanner; import com.alipay.oceanbase.hbase.util.*; import com.alipay.oceanbase.rpc.ObTableClient; @@ -808,8 +809,13 @@ public ResultScanner call() throws IOException { getTargetTableName(tableNameString)); clientQueryAsyncStreamResult = (ObTableClientQueryAsyncStreamResult) obTableClient .execute(request); - return new ClientStreamScanner(clientQueryAsyncStreamResult, - tableNameString, family, true); + if (scan.isAsyncPrefetch()) { + return new ClientAsyncStreamScanner(clientQueryAsyncStreamResult, + tableNameString, family, true, scan.getMaxResultSize()); + } else { + return new ClientStreamScanner(clientQueryAsyncStreamResult, + tableNameString, family, true); + } } else { for (Map.Entry> entry : scan.getFamilyMap() .entrySet()) { @@ -835,8 +841,13 @@ public ResultScanner call() throws IOException { configuration)); clientQueryAsyncStreamResult = (ObTableClientQueryAsyncStreamResult) obTableClient .execute(request); - return new ClientStreamScanner(clientQueryAsyncStreamResult, - tableNameString, family, false); + if (scan.isAsyncPrefetch()) { + return new ClientAsyncStreamScanner(clientQueryAsyncStreamResult, + tableNameString, family, false, scan.getMaxResultSize()); + } else { + return new ClientStreamScanner(clientQueryAsyncStreamResult, + tableNameString, family, false); + } } } } catch (Exception e) { diff --git a/src/main/java/com/alipay/oceanbase/hbase/result/ClientAsyncStreamScanner.java b/src/main/java/com/alipay/oceanbase/hbase/result/ClientAsyncStreamScanner.java new file mode 100644 index 00000000..3e19dbcb --- /dev/null +++ b/src/main/java/com/alipay/oceanbase/hbase/result/ClientAsyncStreamScanner.java @@ -0,0 +1,251 @@ +package com.alipay.oceanbase.hbase.result; + +import com.alipay.oceanbase.hbase.util.OHBaseFuncUtils; +import com.alipay.oceanbase.rpc.protocol.payload.impl.ObObj; +import com.alipay.oceanbase.rpc.stream.ObTableClientQueryAsyncStreamResult; +import com.alipay.oceanbase.rpc.stream.ObTableClientQueryStreamResult; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.PrivateCellUtil; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.util.Threads; +import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; + +import java.io.IOException; +import java.io.InterruptedIOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Consumer; +import java.util.concurrent.LinkedBlockingQueue; + +import static com.alipay.oceanbase.hbase.util.TableHBaseLoggerFactory.LCD; + +public class ClientAsyncStreamScanner extends ClientStreamScanner { + private Queue cache; + private long maxCacheSize; + private AtomicLong cacheSizeInBytes; + long maxResultSize; + // exception queue (from prefetch to main scan execution) + private Queue exceptionsQueue; + // prefetch thread to be executed asynchronously + private Thread prefetcher; + // used for testing + private Consumer prefetchListener = null; + + private final Lock lock = new ReentrantLock(); + private final Condition notEmpty = lock.newCondition(); + private final Condition notFull = lock.newCondition(); + + public ClientAsyncStreamScanner(ObTableClientQueryAsyncStreamResult streamResult, String tableName, byte[] family, boolean isTableGroup, long maxResultSize) throws Exception { + super(streamResult, tableName, family, isTableGroup); + this.maxResultSize = maxResultSize; + initCache(); + loadCache(); + } + + public ClientAsyncStreamScanner(ObTableClientQueryStreamResult streamResult, String tableName, byte[] family, boolean isTableGroup, long maxResultSize) throws Exception { + super(streamResult, tableName, family, isTableGroup); + this.maxResultSize = maxResultSize; + initCache(); + loadCache(); + } + + @VisibleForTesting + public void setPrefetchListener(Consumer prefetchListener) { + this.prefetchListener = prefetchListener; + } + + private void initCache() { + // concurrent cache + maxCacheSize = maxResultSize > Long.MAX_VALUE / 2 ? maxResultSize : maxResultSize * 2; + cache = new LinkedBlockingQueue<>(); + cacheSizeInBytes = new AtomicLong(0); + exceptionsQueue = new ConcurrentLinkedQueue<>(); + prefetcher = new Thread(new PrefetchRunnable()); + Threads.setDaemonThreadRunning(prefetcher, tableName + ".asyncPrefetcher"); + } + + private void loadCache() throws Exception { + if (streamResult.getRowIndex() == -1 && !streamResult.next()) { + return; + } + + long addSize = 0; + while (!streamResult.getCacheRows().isEmpty()) { + try { + checkStatus(); + + List startRow; + + if (streamResult.getRowIndex() != -1) { + startRow = streamResult.getRow(); + } else if (streamResult.next()) { + startRow = streamResult.getRow(); + } else { + return; + } + + byte[][] familyAndQualifier = new byte[2][]; + if (this.isTableGroup) { + // split family and qualifier + familyAndQualifier = OHBaseFuncUtils.extractFamilyFromQualifier((byte[]) startRow + .get(1).getValue()); + this.family = familyAndQualifier[0]; + } else { + familyAndQualifier[1] = (byte[]) startRow.get(1).getValue(); + } + + byte[] sk = (byte[]) startRow.get(0).getValue(); + byte[] sq = familyAndQualifier[1]; + long st = (Long) startRow.get(2).getValue(); + byte[] sv = (byte[]) startRow.get(3).getValue(); + + KeyValue startKeyValue = new KeyValue(sk, family, sq, st, sv); + List keyValues = new ArrayList<>(); + keyValues.add(startKeyValue); + addSize = 0; + while (streamResult.next()) { + List row = streamResult.getRow(); + if (this.isTableGroup) { + // split family and qualifier + familyAndQualifier = OHBaseFuncUtils.extractFamilyFromQualifier((byte[]) row + .get(1).getValue()); + this.family = familyAndQualifier[0]; + } else { + familyAndQualifier[1] = (byte[]) row.get(1).getValue(); + } + byte[] k = (byte[]) row.get(0).getValue(); + byte[] q = familyAndQualifier[1]; + long t = (Long) row.get(2).getValue(); + byte[] v = (byte[]) row.get(3).getValue(); + + if (Arrays.equals(sk, k)) { + // when rowKey is equal to the previous rowKey ,merge the result into the same result + KeyValue kv = new KeyValue(k, family, q, t, v); + addSize += PrivateCellUtil.estimatedSizeOfCell(kv); + keyValues.add(kv); + } else { + break; + } + } + cache.add(Result.create(keyValues)); + addEstimatedSize(addSize); + } catch (Exception e) { + logger.error(LCD.convert("01-00000"), streamResult.getTableName(), e); + throw new IOException(String.format("get table %s stream next result error ", + streamResult.getTableName()), e); + } + } + } + + @Override + public Result next() throws IOException { + try { + lock.lock(); + while (cache.isEmpty()) { + handleException(); + if (this.closed) { + return null; + } + try { + notEmpty.await(); + } catch (InterruptedException e) { + throw new InterruptedIOException("Interrupted when wait to load cache"); + } + } + + Result result = pollCache(); + if (prefetchCondition()) { + notFull.signalAll(); + } + return result; + } finally { + lock.unlock(); + handleException(); + } + } + + @Override + public void close() { + try { + lock.lock(); + super.close(); + closed = true; + notFull.signalAll(); + notEmpty.signalAll(); + } finally { + lock.unlock(); + } + } + + private void addEstimatedSize(long estimatedSize) { + cacheSizeInBytes.addAndGet(estimatedSize); + } + + private void handleException() throws IOException { + //The prefetch task running in the background puts any exception it + //catches into this exception queue. + // Rethrow the exception so the application can handle it. + while (!exceptionsQueue.isEmpty()) { + Exception first = exceptionsQueue.peek(); + first.printStackTrace(); + if (first instanceof IOException) { + throw (IOException) first; + } + throw (RuntimeException) first; + } + } + + private boolean prefetchCondition() { + return cacheSizeInBytes.get() < maxCacheSize / 2; + } + + private long estimatedResultSize(Result res) { + long result_size = 0; + for (Cell cell : res.rawCells()) { + result_size += PrivateCellUtil.estimatedSizeOfCell(cell); + } + return result_size; + } + + private Result pollCache() { + Result res = cache.poll(); + if (null != res) { + long estimatedSize = estimatedResultSize(res); + addEstimatedSize(-estimatedSize); + } + return res; + } + + private class PrefetchRunnable implements Runnable { + @Override + public void run() { + while (!closed) { + boolean succeed = false; + try { + lock.lock(); + while (!prefetchCondition()) { + notFull.await(); + } + loadCache(); + succeed = true; + } catch (Exception e) { + exceptionsQueue.add(e); + } finally { + notEmpty.signalAll(); + lock.unlock(); + if (prefetchListener != null) { + prefetchListener.accept(succeed); + } + } + } + } + } +} \ No newline at end of file diff --git a/src/main/java/com/alipay/oceanbase/hbase/result/ClientStreamScanner.java b/src/main/java/com/alipay/oceanbase/hbase/result/ClientStreamScanner.java index 35d01867..eee0e978 100644 --- a/src/main/java/com/alipay/oceanbase/hbase/result/ClientStreamScanner.java +++ b/src/main/java/com/alipay/oceanbase/hbase/result/ClientStreamScanner.java @@ -39,18 +39,18 @@ @InterfaceAudience.Private public class ClientStreamScanner extends AbstractClientScanner { - private static final Logger logger = TableHBaseLoggerFactory + protected static final Logger logger = TableHBaseLoggerFactory .getLogger(ClientStreamScanner.class); - private final AbstractQueryStreamResult streamResult; + protected final AbstractQueryStreamResult streamResult; - private final String tableName; + protected final String tableName; - private byte[] family; + protected byte[] family; - private boolean closed = false; + protected boolean closed = false; - private boolean isTableGroup = false; + protected boolean isTableGroup = false; public ClientStreamScanner(ObTableClientQueryStreamResult streamResult, String tableName, byte[] family, boolean isTableGroup) { @@ -151,7 +151,7 @@ public boolean renewLease() { } } - private void checkStatus() throws IllegalStateException { + void checkStatus() throws IllegalStateException { if (closed) { throw new IllegalStateException("table " + tableName + " family " + Bytes.toString(family) + " scanner is closed"); diff --git a/src/test/java/com/alipay/oceanbase/hbase/HTableTestBase.java b/src/test/java/com/alipay/oceanbase/hbase/HTableTestBase.java index 0264f997..11f522ce 100644 --- a/src/test/java/com/alipay/oceanbase/hbase/HTableTestBase.java +++ b/src/test/java/com/alipay/oceanbase/hbase/HTableTestBase.java @@ -18,6 +18,7 @@ package com.alipay.oceanbase.hbase; import com.alipay.oceanbase.hbase.exception.FeatureNotSupportedException; +import com.alipay.oceanbase.hbase.result.ClientAsyncStreamScanner; import org.apache.hadoop.hbase.*; import org.apache.hadoop.hbase.client.*; import org.apache.hadoop.hbase.filter.*; @@ -36,6 +37,8 @@ import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.util.*; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; import static org.apache.hadoop.hbase.filter.FilterList.Operator.MUST_PASS_ALL; import static org.apache.hadoop.hbase.filter.FilterList.Operator.MUST_PASS_ONE; @@ -5627,6 +5630,54 @@ public void testScannerMultiVersion() throws Exception { Assert.assertEquals(2, resultList.size()); } + @Test + public void testAsyncPrefetchScanner() throws IOException { + testAsyncPrefetchScannerInner(null); + testAsyncPrefetchScannerInner((b) -> { + try { + TimeUnit.MILLISECONDS.sleep(500); + } catch (InterruptedException ignored) { + } + }); + } + + public void testAsyncPrefetchScannerInner(Consumer listener) throws IOException { + String key = "async_scanner"; + String column = "column"; + String value = "value"; + String family = "family1"; + Put put; + int row_count = 40; + int column_count = 40; + for (int i = 0; i < row_count; i++) { + String k = key + i; + for (int j = 0; j < column_count; j++) { + put = new Put(k.getBytes()); + put.addColumn(family.getBytes(), Bytes.toBytes(column + j), (value + j).getBytes()); + hTable.put(put); + } + } + + Scan scan = new Scan(); + scan.readVersions(10); + scan.setAsyncPrefetch(true); + ResultScanner scanner = hTable.getScanner(scan); + assertTrue(scanner instanceof ClientAsyncStreamScanner); + ((ClientAsyncStreamScanner) scanner).setPrefetchListener(listener); + + int count = 0; + for (Result res: scanner) { + for (Cell cell: res.rawCells()) { + int rowId = count / row_count; + int columnId = count % row_count; + Assert.assertEquals((key + rowId).getBytes(), CellUtil.cloneRow(cell)); + Assert.assertEquals((column + columnId).getBytes(), CellUtil.cloneQualifier(cell)); + count ++; + } + } + assertEquals(row_count * column_count, count); + } + @Test public void testPutColumnFamilyNull() throws Exception { Put put1 = new Put(("key_c_f").getBytes()); From bb306d7f986e91c2723c645c7d4d3ed74963c12e Mon Sep 17 00:00:00 2001 From: miyuan-ljr Date: Wed, 30 Oct 2024 14:59:47 +0800 Subject: [PATCH 2/4] refine --- .../com/alipay/oceanbase/hbase/HTableTestBase.java | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/src/test/java/com/alipay/oceanbase/hbase/HTableTestBase.java b/src/test/java/com/alipay/oceanbase/hbase/HTableTestBase.java index 11f522ce..e3008e39 100644 --- a/src/test/java/com/alipay/oceanbase/hbase/HTableTestBase.java +++ b/src/test/java/com/alipay/oceanbase/hbase/HTableTestBase.java @@ -5632,23 +5632,24 @@ public void testScannerMultiVersion() throws Exception { @Test public void testAsyncPrefetchScanner() throws IOException { - testAsyncPrefetchScannerInner(null); - testAsyncPrefetchScannerInner((b) -> { + testAsyncPrefetchScannerInner(40, 40, null); + testAsyncPrefetchScannerInner(40, 40, (b) -> { try { TimeUnit.MILLISECONDS.sleep(500); } catch (InterruptedException ignored) { } }); + testAsyncPrefetchScannerInner(40, 40, (b) -> { + System.out.println("prefetch status: " + b); + }); } - public void testAsyncPrefetchScannerInner(Consumer listener) throws IOException { + public void testAsyncPrefetchScannerInner(int row_count, int column_count, Consumer listener) throws IOException { String key = "async_scanner"; String column = "column"; String value = "value"; String family = "family1"; Put put; - int row_count = 40; - int column_count = 40; for (int i = 0; i < row_count; i++) { String k = key + i; for (int j = 0; j < column_count; j++) { From fb5af0e526d72b462e20cdd71aa391ef320e4055 Mon Sep 17 00:00:00 2001 From: miyuan-ljr Date: Thu, 31 Oct 2024 11:37:18 +0800 Subject: [PATCH 3/4] ClientAsyncStreamScanner --- .../com/alipay/oceanbase/hbase/OHTable.java | 27 +++++++++++-- .../result/ClientAsyncStreamScanner.java | 18 +++------ .../oceanbase/hbase/HTableTestBase.java | 38 +++++++++++++++---- 3 files changed, 58 insertions(+), 25 deletions(-) diff --git a/src/main/java/com/alipay/oceanbase/hbase/OHTable.java b/src/main/java/com/alipay/oceanbase/hbase/OHTable.java index 32f577f7..7c7cfbb5 100644 --- a/src/main/java/com/alipay/oceanbase/hbase/OHTable.java +++ b/src/main/java/com/alipay/oceanbase/hbase/OHTable.java @@ -789,6 +789,11 @@ public ResultScanner call() throws IOException { ObTableQueryAsyncRequest request; ObTableQuery obTableQuery; ObHTableFilter filter; + Boolean async = scan.isAsyncPrefetch(); + if (async == null) { + async = configuration.getBoolean( + Scan.HBASE_CLIENT_SCANNER_ASYNC_PREFETCH, Scan.DEFAULT_HBASE_CLIENT_SCANNER_ASYNC_PREFETCH); + } try { if (scan.getFamilyMap().keySet() == null || scan.getFamilyMap().keySet().isEmpty() @@ -809,9 +814,16 @@ public ResultScanner call() throws IOException { getTargetTableName(tableNameString)); clientQueryAsyncStreamResult = (ObTableClientQueryAsyncStreamResult) obTableClient .execute(request); - if (scan.isAsyncPrefetch()) { + if (async) { + long maxScannerResultSize; + if (scan.getMaxResultSize() > 0) { + maxScannerResultSize = scan.getMaxResultSize(); + } else { + maxScannerResultSize = conf.getLong(HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY, + HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE); + } return new ClientAsyncStreamScanner(clientQueryAsyncStreamResult, - tableNameString, family, true, scan.getMaxResultSize()); + tableNameString, family, true, maxScannerResultSize); } else { return new ClientStreamScanner(clientQueryAsyncStreamResult, tableNameString, family, true); @@ -841,9 +853,16 @@ public ResultScanner call() throws IOException { configuration)); clientQueryAsyncStreamResult = (ObTableClientQueryAsyncStreamResult) obTableClient .execute(request); - if (scan.isAsyncPrefetch()) { + if (async) { + long maxScannerResultSize; + if (scan.getMaxResultSize() > 0) { + maxScannerResultSize = scan.getMaxResultSize(); + } else { + maxScannerResultSize = conf.getLong(HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY, + HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE); + } return new ClientAsyncStreamScanner(clientQueryAsyncStreamResult, - tableNameString, family, false, scan.getMaxResultSize()); + tableNameString, family, false, maxScannerResultSize); } else { return new ClientStreamScanner(clientQueryAsyncStreamResult, tableNameString, family, false); diff --git a/src/main/java/com/alipay/oceanbase/hbase/result/ClientAsyncStreamScanner.java b/src/main/java/com/alipay/oceanbase/hbase/result/ClientAsyncStreamScanner.java index 3e19dbcb..f856f0e0 100644 --- a/src/main/java/com/alipay/oceanbase/hbase/result/ClientAsyncStreamScanner.java +++ b/src/main/java/com/alipay/oceanbase/hbase/result/ClientAsyncStreamScanner.java @@ -38,6 +38,7 @@ public class ClientAsyncStreamScanner extends ClientStreamScanner { private Thread prefetcher; // used for testing private Consumer prefetchListener = null; + private boolean streamNext = true; private final Lock lock = new ReentrantLock(); private final Condition notEmpty = lock.newCondition(); @@ -47,14 +48,12 @@ public ClientAsyncStreamScanner(ObTableClientQueryAsyncStreamResult streamResult super(streamResult, tableName, family, isTableGroup); this.maxResultSize = maxResultSize; initCache(); - loadCache(); } public ClientAsyncStreamScanner(ObTableClientQueryStreamResult streamResult, String tableName, byte[] family, boolean isTableGroup, long maxResultSize) throws Exception { super(streamResult, tableName, family, isTableGroup); this.maxResultSize = maxResultSize; initCache(); - loadCache(); } @VisibleForTesting @@ -74,6 +73,7 @@ private void initCache() { private void loadCache() throws Exception { if (streamResult.getRowIndex() == -1 && !streamResult.next()) { + streamNext = false; return; } @@ -82,15 +82,7 @@ private void loadCache() throws Exception { try { checkStatus(); - List startRow; - - if (streamResult.getRowIndex() != -1) { - startRow = streamResult.getRow(); - } else if (streamResult.next()) { - startRow = streamResult.getRow(); - } else { - return; - } + List startRow = streamResult.getRow(); byte[][] familyAndQualifier = new byte[2][]; if (this.isTableGroup) { @@ -111,7 +103,7 @@ private void loadCache() throws Exception { List keyValues = new ArrayList<>(); keyValues.add(startKeyValue); addSize = 0; - while (streamResult.next()) { + while ((streamNext = streamResult.next())) { List row = streamResult.getRow(); if (this.isTableGroup) { // split family and qualifier @@ -149,7 +141,7 @@ private void loadCache() throws Exception { public Result next() throws IOException { try { lock.lock(); - while (cache.isEmpty()) { + while (cache.isEmpty() && streamNext) { handleException(); if (this.closed) { return null; diff --git a/src/test/java/com/alipay/oceanbase/hbase/HTableTestBase.java b/src/test/java/com/alipay/oceanbase/hbase/HTableTestBase.java index e3008e39..ecf7ea48 100644 --- a/src/test/java/com/alipay/oceanbase/hbase/HTableTestBase.java +++ b/src/test/java/com/alipay/oceanbase/hbase/HTableTestBase.java @@ -5631,14 +5631,28 @@ public void testScannerMultiVersion() throws Exception { } @Test - public void testAsyncPrefetchScanner() throws IOException { + public void testAsyncPrefetchScanner1() throws IOException { testAsyncPrefetchScannerInner(40, 40, null); + } + @Test + public void testAsyncPrefetchScanner2() throws IOException { + testAsyncPrefetchScannerInner(4000, 3, null); + } + @Test + public void testAsyncPrefetchScanner3() throws IOException { + testAsyncPrefetchScannerInner(3, 4000, null); + } + @Test + public void testAsyncPrefetchScanner4() throws IOException { testAsyncPrefetchScannerInner(40, 40, (b) -> { try { TimeUnit.MILLISECONDS.sleep(500); } catch (InterruptedException ignored) { } }); + } + @Test + public void testAsyncPrefetchScanner5() throws IOException { testAsyncPrefetchScannerInner(40, 40, (b) -> { System.out.println("prefetch status: " + b); }); @@ -5650,17 +5664,25 @@ public void testAsyncPrefetchScannerInner(int row_count, int column_count, Consu String value = "value"; String family = "family1"; Put put; + List puts = new ArrayList<>(); for (int i = 0; i < row_count; i++) { - String k = key + i; + String k = key + String.format("%05d", i); for (int j = 0; j < column_count; j++) { put = new Put(k.getBytes()); - put.addColumn(family.getBytes(), Bytes.toBytes(column + j), (value + j).getBytes()); - hTable.put(put); + put.addColumn(family.getBytes(), Bytes.toBytes(column + String.format("%05d", j)), (value + String.format("%05d", j)).getBytes()); + puts.add(put); + if (puts.size() > 1000) { + hTable.put(puts); + puts.clear(); + } } } + hTable.put(puts); + puts.clear(); Scan scan = new Scan(); scan.readVersions(10); + scan.addFamily(family.getBytes()); scan.setAsyncPrefetch(true); ResultScanner scanner = hTable.getScanner(scan); assertTrue(scanner instanceof ClientAsyncStreamScanner); @@ -5669,10 +5691,10 @@ public void testAsyncPrefetchScannerInner(int row_count, int column_count, Consu int count = 0; for (Result res: scanner) { for (Cell cell: res.rawCells()) { - int rowId = count / row_count; - int columnId = count % row_count; - Assert.assertEquals((key + rowId).getBytes(), CellUtil.cloneRow(cell)); - Assert.assertEquals((column + columnId).getBytes(), CellUtil.cloneQualifier(cell)); + int rowId = count / column_count; + int columnId = count % column_count; + Assert.assertEquals(key + String.format("%05d", rowId), Bytes.toString(CellUtil.cloneRow(cell))); + Assert.assertEquals(column + String.format("%05d", columnId), Bytes.toString(CellUtil.cloneQualifier(cell))); count ++; } } From 5e8a2f3e7854606f09348dfc8101894f10e3e9c1 Mon Sep 17 00:00:00 2001 From: miyuan-ljr Date: Thu, 31 Oct 2024 11:51:34 +0800 Subject: [PATCH 4/4] ClientAsyncStreamScanner --- src/test/java/com/alipay/oceanbase/hbase/HTableTestBase.java | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/test/java/com/alipay/oceanbase/hbase/HTableTestBase.java b/src/test/java/com/alipay/oceanbase/hbase/HTableTestBase.java index ecf7ea48..1e981b7c 100644 --- a/src/test/java/com/alipay/oceanbase/hbase/HTableTestBase.java +++ b/src/test/java/com/alipay/oceanbase/hbase/HTableTestBase.java @@ -5653,9 +5653,7 @@ public void testAsyncPrefetchScanner4() throws IOException { } @Test public void testAsyncPrefetchScanner5() throws IOException { - testAsyncPrefetchScannerInner(40, 40, (b) -> { - System.out.println("prefetch status: " + b); - }); + testAsyncPrefetchScannerInner(40, 40, Assert::assertTrue); } public void testAsyncPrefetchScannerInner(int row_count, int column_count, Consumer listener) throws IOException { @@ -5699,6 +5697,7 @@ public void testAsyncPrefetchScannerInner(int row_count, int column_count, Consu } } assertEquals(row_count * column_count, count); + scanner.close(); } @Test