diff --git a/src/main/java/com/alipay/oceanbase/hbase/OHTable.java b/src/main/java/com/alipay/oceanbase/hbase/OHTable.java index 298d0828..7c7cfbb5 100644 --- a/src/main/java/com/alipay/oceanbase/hbase/OHTable.java +++ b/src/main/java/com/alipay/oceanbase/hbase/OHTable.java @@ -21,6 +21,7 @@ import com.alipay.oceanbase.hbase.exception.OperationTimeoutException; import com.alipay.oceanbase.hbase.execute.ServerCallable; import com.alipay.oceanbase.hbase.filter.HBaseFilterUtils; +import com.alipay.oceanbase.hbase.result.ClientAsyncStreamScanner; import com.alipay.oceanbase.hbase.result.ClientStreamScanner; import com.alipay.oceanbase.hbase.util.*; import com.alipay.oceanbase.rpc.ObTableClient; @@ -788,6 +789,11 @@ public ResultScanner call() throws IOException { ObTableQueryAsyncRequest request; ObTableQuery obTableQuery; ObHTableFilter filter; + Boolean async = scan.isAsyncPrefetch(); + if (async == null) { + async = configuration.getBoolean( + Scan.HBASE_CLIENT_SCANNER_ASYNC_PREFETCH, Scan.DEFAULT_HBASE_CLIENT_SCANNER_ASYNC_PREFETCH); + } try { if (scan.getFamilyMap().keySet() == null || scan.getFamilyMap().keySet().isEmpty() @@ -808,8 +814,20 @@ public ResultScanner call() throws IOException { getTargetTableName(tableNameString)); clientQueryAsyncStreamResult = (ObTableClientQueryAsyncStreamResult) obTableClient .execute(request); - return new ClientStreamScanner(clientQueryAsyncStreamResult, - tableNameString, family, true); + if (async) { + long maxScannerResultSize; + if (scan.getMaxResultSize() > 0) { + maxScannerResultSize = scan.getMaxResultSize(); + } else { + maxScannerResultSize = conf.getLong(HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY, + HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE); + } + return new ClientAsyncStreamScanner(clientQueryAsyncStreamResult, + tableNameString, family, true, maxScannerResultSize); + } else { + return new ClientStreamScanner(clientQueryAsyncStreamResult, + tableNameString, family, true); + } } else { for (Map.Entry> entry : scan.getFamilyMap() .entrySet()) { @@ -835,8 +853,20 @@ public ResultScanner call() throws IOException { configuration)); clientQueryAsyncStreamResult = (ObTableClientQueryAsyncStreamResult) obTableClient .execute(request); - return new ClientStreamScanner(clientQueryAsyncStreamResult, - tableNameString, family, false); + if (async) { + long maxScannerResultSize; + if (scan.getMaxResultSize() > 0) { + maxScannerResultSize = scan.getMaxResultSize(); + } else { + maxScannerResultSize = conf.getLong(HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY, + HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE); + } + return new ClientAsyncStreamScanner(clientQueryAsyncStreamResult, + tableNameString, family, false, maxScannerResultSize); + } else { + return new ClientStreamScanner(clientQueryAsyncStreamResult, + tableNameString, family, false); + } } } } catch (Exception e) { diff --git a/src/main/java/com/alipay/oceanbase/hbase/result/ClientAsyncStreamScanner.java b/src/main/java/com/alipay/oceanbase/hbase/result/ClientAsyncStreamScanner.java new file mode 100644 index 00000000..f856f0e0 --- /dev/null +++ b/src/main/java/com/alipay/oceanbase/hbase/result/ClientAsyncStreamScanner.java @@ -0,0 +1,243 @@ +package com.alipay.oceanbase.hbase.result; + +import com.alipay.oceanbase.hbase.util.OHBaseFuncUtils; +import com.alipay.oceanbase.rpc.protocol.payload.impl.ObObj; +import com.alipay.oceanbase.rpc.stream.ObTableClientQueryAsyncStreamResult; +import com.alipay.oceanbase.rpc.stream.ObTableClientQueryStreamResult; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.PrivateCellUtil; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.util.Threads; +import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; + +import java.io.IOException; +import java.io.InterruptedIOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Consumer; +import java.util.concurrent.LinkedBlockingQueue; + +import static com.alipay.oceanbase.hbase.util.TableHBaseLoggerFactory.LCD; + +public class ClientAsyncStreamScanner extends ClientStreamScanner { + private Queue cache; + private long maxCacheSize; + private AtomicLong cacheSizeInBytes; + long maxResultSize; + // exception queue (from prefetch to main scan execution) + private Queue exceptionsQueue; + // prefetch thread to be executed asynchronously + private Thread prefetcher; + // used for testing + private Consumer prefetchListener = null; + private boolean streamNext = true; + + private final Lock lock = new ReentrantLock(); + private final Condition notEmpty = lock.newCondition(); + private final Condition notFull = lock.newCondition(); + + public ClientAsyncStreamScanner(ObTableClientQueryAsyncStreamResult streamResult, String tableName, byte[] family, boolean isTableGroup, long maxResultSize) throws Exception { + super(streamResult, tableName, family, isTableGroup); + this.maxResultSize = maxResultSize; + initCache(); + } + + public ClientAsyncStreamScanner(ObTableClientQueryStreamResult streamResult, String tableName, byte[] family, boolean isTableGroup, long maxResultSize) throws Exception { + super(streamResult, tableName, family, isTableGroup); + this.maxResultSize = maxResultSize; + initCache(); + } + + @VisibleForTesting + public void setPrefetchListener(Consumer prefetchListener) { + this.prefetchListener = prefetchListener; + } + + private void initCache() { + // concurrent cache + maxCacheSize = maxResultSize > Long.MAX_VALUE / 2 ? maxResultSize : maxResultSize * 2; + cache = new LinkedBlockingQueue<>(); + cacheSizeInBytes = new AtomicLong(0); + exceptionsQueue = new ConcurrentLinkedQueue<>(); + prefetcher = new Thread(new PrefetchRunnable()); + Threads.setDaemonThreadRunning(prefetcher, tableName + ".asyncPrefetcher"); + } + + private void loadCache() throws Exception { + if (streamResult.getRowIndex() == -1 && !streamResult.next()) { + streamNext = false; + return; + } + + long addSize = 0; + while (!streamResult.getCacheRows().isEmpty()) { + try { + checkStatus(); + + List startRow = streamResult.getRow(); + + byte[][] familyAndQualifier = new byte[2][]; + if (this.isTableGroup) { + // split family and qualifier + familyAndQualifier = OHBaseFuncUtils.extractFamilyFromQualifier((byte[]) startRow + .get(1).getValue()); + this.family = familyAndQualifier[0]; + } else { + familyAndQualifier[1] = (byte[]) startRow.get(1).getValue(); + } + + byte[] sk = (byte[]) startRow.get(0).getValue(); + byte[] sq = familyAndQualifier[1]; + long st = (Long) startRow.get(2).getValue(); + byte[] sv = (byte[]) startRow.get(3).getValue(); + + KeyValue startKeyValue = new KeyValue(sk, family, sq, st, sv); + List keyValues = new ArrayList<>(); + keyValues.add(startKeyValue); + addSize = 0; + while ((streamNext = streamResult.next())) { + List row = streamResult.getRow(); + if (this.isTableGroup) { + // split family and qualifier + familyAndQualifier = OHBaseFuncUtils.extractFamilyFromQualifier((byte[]) row + .get(1).getValue()); + this.family = familyAndQualifier[0]; + } else { + familyAndQualifier[1] = (byte[]) row.get(1).getValue(); + } + byte[] k = (byte[]) row.get(0).getValue(); + byte[] q = familyAndQualifier[1]; + long t = (Long) row.get(2).getValue(); + byte[] v = (byte[]) row.get(3).getValue(); + + if (Arrays.equals(sk, k)) { + // when rowKey is equal to the previous rowKey ,merge the result into the same result + KeyValue kv = new KeyValue(k, family, q, t, v); + addSize += PrivateCellUtil.estimatedSizeOfCell(kv); + keyValues.add(kv); + } else { + break; + } + } + cache.add(Result.create(keyValues)); + addEstimatedSize(addSize); + } catch (Exception e) { + logger.error(LCD.convert("01-00000"), streamResult.getTableName(), e); + throw new IOException(String.format("get table %s stream next result error ", + streamResult.getTableName()), e); + } + } + } + + @Override + public Result next() throws IOException { + try { + lock.lock(); + while (cache.isEmpty() && streamNext) { + handleException(); + if (this.closed) { + return null; + } + try { + notEmpty.await(); + } catch (InterruptedException e) { + throw new InterruptedIOException("Interrupted when wait to load cache"); + } + } + + Result result = pollCache(); + if (prefetchCondition()) { + notFull.signalAll(); + } + return result; + } finally { + lock.unlock(); + handleException(); + } + } + + @Override + public void close() { + try { + lock.lock(); + super.close(); + closed = true; + notFull.signalAll(); + notEmpty.signalAll(); + } finally { + lock.unlock(); + } + } + + private void addEstimatedSize(long estimatedSize) { + cacheSizeInBytes.addAndGet(estimatedSize); + } + + private void handleException() throws IOException { + //The prefetch task running in the background puts any exception it + //catches into this exception queue. + // Rethrow the exception so the application can handle it. + while (!exceptionsQueue.isEmpty()) { + Exception first = exceptionsQueue.peek(); + first.printStackTrace(); + if (first instanceof IOException) { + throw (IOException) first; + } + throw (RuntimeException) first; + } + } + + private boolean prefetchCondition() { + return cacheSizeInBytes.get() < maxCacheSize / 2; + } + + private long estimatedResultSize(Result res) { + long result_size = 0; + for (Cell cell : res.rawCells()) { + result_size += PrivateCellUtil.estimatedSizeOfCell(cell); + } + return result_size; + } + + private Result pollCache() { + Result res = cache.poll(); + if (null != res) { + long estimatedSize = estimatedResultSize(res); + addEstimatedSize(-estimatedSize); + } + return res; + } + + private class PrefetchRunnable implements Runnable { + @Override + public void run() { + while (!closed) { + boolean succeed = false; + try { + lock.lock(); + while (!prefetchCondition()) { + notFull.await(); + } + loadCache(); + succeed = true; + } catch (Exception e) { + exceptionsQueue.add(e); + } finally { + notEmpty.signalAll(); + lock.unlock(); + if (prefetchListener != null) { + prefetchListener.accept(succeed); + } + } + } + } + } +} \ No newline at end of file diff --git a/src/main/java/com/alipay/oceanbase/hbase/result/ClientStreamScanner.java b/src/main/java/com/alipay/oceanbase/hbase/result/ClientStreamScanner.java index 35d01867..eee0e978 100644 --- a/src/main/java/com/alipay/oceanbase/hbase/result/ClientStreamScanner.java +++ b/src/main/java/com/alipay/oceanbase/hbase/result/ClientStreamScanner.java @@ -39,18 +39,18 @@ @InterfaceAudience.Private public class ClientStreamScanner extends AbstractClientScanner { - private static final Logger logger = TableHBaseLoggerFactory + protected static final Logger logger = TableHBaseLoggerFactory .getLogger(ClientStreamScanner.class); - private final AbstractQueryStreamResult streamResult; + protected final AbstractQueryStreamResult streamResult; - private final String tableName; + protected final String tableName; - private byte[] family; + protected byte[] family; - private boolean closed = false; + protected boolean closed = false; - private boolean isTableGroup = false; + protected boolean isTableGroup = false; public ClientStreamScanner(ObTableClientQueryStreamResult streamResult, String tableName, byte[] family, boolean isTableGroup) { @@ -151,7 +151,7 @@ public boolean renewLease() { } } - private void checkStatus() throws IllegalStateException { + void checkStatus() throws IllegalStateException { if (closed) { throw new IllegalStateException("table " + tableName + " family " + Bytes.toString(family) + " scanner is closed"); diff --git a/src/test/java/com/alipay/oceanbase/hbase/HTableTestBase.java b/src/test/java/com/alipay/oceanbase/hbase/HTableTestBase.java index 0264f997..1e981b7c 100644 --- a/src/test/java/com/alipay/oceanbase/hbase/HTableTestBase.java +++ b/src/test/java/com/alipay/oceanbase/hbase/HTableTestBase.java @@ -18,6 +18,7 @@ package com.alipay.oceanbase.hbase; import com.alipay.oceanbase.hbase.exception.FeatureNotSupportedException; +import com.alipay.oceanbase.hbase.result.ClientAsyncStreamScanner; import org.apache.hadoop.hbase.*; import org.apache.hadoop.hbase.client.*; import org.apache.hadoop.hbase.filter.*; @@ -36,6 +37,8 @@ import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.util.*; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; import static org.apache.hadoop.hbase.filter.FilterList.Operator.MUST_PASS_ALL; import static org.apache.hadoop.hbase.filter.FilterList.Operator.MUST_PASS_ONE; @@ -5627,6 +5630,76 @@ public void testScannerMultiVersion() throws Exception { Assert.assertEquals(2, resultList.size()); } + @Test + public void testAsyncPrefetchScanner1() throws IOException { + testAsyncPrefetchScannerInner(40, 40, null); + } + @Test + public void testAsyncPrefetchScanner2() throws IOException { + testAsyncPrefetchScannerInner(4000, 3, null); + } + @Test + public void testAsyncPrefetchScanner3() throws IOException { + testAsyncPrefetchScannerInner(3, 4000, null); + } + @Test + public void testAsyncPrefetchScanner4() throws IOException { + testAsyncPrefetchScannerInner(40, 40, (b) -> { + try { + TimeUnit.MILLISECONDS.sleep(500); + } catch (InterruptedException ignored) { + } + }); + } + @Test + public void testAsyncPrefetchScanner5() throws IOException { + testAsyncPrefetchScannerInner(40, 40, Assert::assertTrue); + } + + public void testAsyncPrefetchScannerInner(int row_count, int column_count, Consumer listener) throws IOException { + String key = "async_scanner"; + String column = "column"; + String value = "value"; + String family = "family1"; + Put put; + List puts = new ArrayList<>(); + for (int i = 0; i < row_count; i++) { + String k = key + String.format("%05d", i); + for (int j = 0; j < column_count; j++) { + put = new Put(k.getBytes()); + put.addColumn(family.getBytes(), Bytes.toBytes(column + String.format("%05d", j)), (value + String.format("%05d", j)).getBytes()); + puts.add(put); + if (puts.size() > 1000) { + hTable.put(puts); + puts.clear(); + } + } + } + hTable.put(puts); + puts.clear(); + + Scan scan = new Scan(); + scan.readVersions(10); + scan.addFamily(family.getBytes()); + scan.setAsyncPrefetch(true); + ResultScanner scanner = hTable.getScanner(scan); + assertTrue(scanner instanceof ClientAsyncStreamScanner); + ((ClientAsyncStreamScanner) scanner).setPrefetchListener(listener); + + int count = 0; + for (Result res: scanner) { + for (Cell cell: res.rawCells()) { + int rowId = count / column_count; + int columnId = count % column_count; + Assert.assertEquals(key + String.format("%05d", rowId), Bytes.toString(CellUtil.cloneRow(cell))); + Assert.assertEquals(column + String.format("%05d", columnId), Bytes.toString(CellUtil.cloneQualifier(cell))); + count ++; + } + } + assertEquals(row_count * column_count, count); + scanner.close(); + } + @Test public void testPutColumnFamilyNull() throws Exception { Put put1 = new Put(("key_c_f").getBytes());