Skip to content

Commit 43f59ca

Browse files
JackShi148stuBirdFlymiyuan-ljr
authored
OHBufferedMutatorImpl 2.0 compatibility (#91)
* add DepentdentFilter and SingleColumnValueExcludeFilter * add singleColumnValueExcludeFilter and DependentColumnFilter; add test cases and optimize bufferedMutator test cases * hbase support batch (#84) * Add DeleteFamilyVersion function and corresponding test cases (#85) * add DepentdentFilter and SingleColumnValueExcludeFilter * add singleColumnValueExcludeFilter and DependentColumnFilter; add test cases and optimize bufferedMutator test cases * add single cf timerange setting in Get * single cf setColumnFamilyTimeRange in Get and Scan * optimize code * add DeleteFamilyVersion and test cases * add DeleteFamilyVersion; optimize test cases * add DeleteFamilyVersion test case and pass * format code * delete useless self-defined table * remove DeleteFamilyVersion file and move all cases to MultiColumnFamilyTest * hbase support batchCallBack (#86) * adjust bufferdMutatorImpl 1.x to new batch * bufferedMutator do not retry, batch retry in table client * hbase support batch (#84) * add rpcTimeout and operationTimetout setting in bufferedMutator * fix test * Add DeleteFamilyVersion function and corresponding test cases (#85) * add DepentdentFilter and SingleColumnValueExcludeFilter * add singleColumnValueExcludeFilter and DependentColumnFilter; add test cases and optimize bufferedMutator test cases * add single cf timerange setting in Get * single cf setColumnFamilyTimeRange in Get and Scan * optimize code * add DeleteFamilyVersion and test cases * add DeleteFamilyVersion; optimize test cases * add DeleteFamilyVersion test case and pass * format code * delete useless self-defined table * hbase support batchCallBack (#86) * fix test * fix test * init hbase_2.0 bufferedMutatorImpl * pass single bufferedMutator test * remove useless comments * format code * add inherited interface in bufferedMutator; fix concurrent bug in bufferedMuator execution * fix typo * fix exception erros message * update time to wait pool to shutdown * add import * pass test cases after merge --------- Co-authored-by: stuBirdFly <84010733+stuBirdFly@users.noreply.github.com> Co-authored-by: miyuan-ljr <miyuan.ljr@antgroup.com>
1 parent 8751fdc commit 43f59ca

File tree

12 files changed

+1349
-413
lines changed

12 files changed

+1349
-413
lines changed

src/main/java/com/alipay/oceanbase/hbase/OHTable.java

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,6 @@
3636
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.query.*;
3737
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.syncquery.ObTableQueryAsyncRequest;
3838
import com.alipay.oceanbase.rpc.stream.ObTableClientQueryAsyncStreamResult;
39-
import com.alipay.oceanbase.rpc.stream.ObTableClientQueryStreamResult;
4039
import com.alipay.oceanbase.rpc.table.ObHBaseParams;
4140
import com.alipay.oceanbase.rpc.table.ObKVParams;
4241
import com.alipay.sofa.common.thread.SofaThreadPoolExecutor;
@@ -198,8 +197,13 @@ public OHTable(Configuration configuration, String tableName) throws IOException
198197
DEFAULT_HBASE_HTABLE_THREAD_KEEP_ALIVE_TIME);
199198
this.executePool = createDefaultThreadPoolExecutor(1, maxThreads, keepAliveTime);
200199
OHConnectionConfiguration ohConnectionConf = new OHConnectionConfiguration(configuration);
200+
int numRetries = configuration.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
201+
HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
201202
this.obTableClient = ObTableClientManager.getOrCreateObTableClient(setUserDefinedNamespace(
202203
this.tableNameString, ohConnectionConf));
204+
this.obTableClient.setRpcExecuteTimeout(ohConnectionConf.getRpcTimeout());
205+
this.obTableClient.setRuntimeRetryTimes(numRetries);
206+
setOperationTimeout(ohConnectionConf.getOperationTimeout());
203207

204208
finishSetUp();
205209
}
@@ -246,8 +250,13 @@ public OHTable(Configuration configuration, final byte[] tableName,
246250
this.executePool = executePool;
247251
this.cleanupPoolOnClose = false;
248252
OHConnectionConfiguration ohConnectionConf = new OHConnectionConfiguration(configuration);
253+
int numRetries = configuration.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
254+
HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
249255
this.obTableClient = ObTableClientManager.getOrCreateObTableClient(setUserDefinedNamespace(
250256
this.tableNameString, ohConnectionConf));
257+
this.obTableClient.setRpcExecuteTimeout(ohConnectionConf.getRpcTimeout());
258+
this.obTableClient.setRuntimeRetryTimes(numRetries);
259+
setOperationTimeout(ohConnectionConf.getOperationTimeout());
251260

252261
finishSetUp();
253262
}
@@ -311,8 +320,15 @@ public OHTable(TableName tableName, Connection connection,
311320
DEFAULT_HBASE_HTABLE_PUT_WRITE_BUFFER_CHECK);
312321
this.writeBufferSize = connectionConfig.getWriteBufferSize();
313322
this.tableName = tableName.getName();
323+
int numRetries = configuration.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
324+
HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
314325
this.obTableClient = ObTableClientManager.getOrCreateObTableClient(setUserDefinedNamespace(
315326
this.tableNameString, connectionConfig));
327+
this.obTableClient.setRpcExecuteTimeout(rpcTimeout);
328+
this.obTableClient.setRuntimeRetryTimes(numRetries);
329+
setOperationTimeout(operationTimeout);
330+
331+
finishSetUp();
316332
}
317333

318334
/**
@@ -833,7 +849,8 @@ private void validatePut(Put put) {
833849
throw new IllegalArgumentException("family is empty");
834850
}
835851
for (Cell kv : entry.getValue()) {
836-
if (kv.getRowLength() + kv.getValueLength() + kv.getQualifierLength() + Bytes.toBytes(kv.getTimestamp()).length + kv.getFamilyLength() > maxKeyValueSize) {
852+
if (kv.getRowLength() + kv.getValueLength() + kv.getQualifierLength()
853+
+ Bytes.toBytes(kv.getTimestamp()).length + kv.getFamilyLength() > maxKeyValueSize) {
837854
throw new IllegalArgumentException("KeyValue size too large");
838855
}
839856
}

src/main/java/com/alipay/oceanbase/hbase/OHTablePool.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -49,19 +49,19 @@
4949

5050
public class OHTablePool implements Closeable {
5151

52-
private String originTabelName = null;
53-
private final PoolMap<String, Table> tables;
54-
private final int maxSize;
55-
private final PoolMap.PoolType poolType;
56-
private final Configuration config;
57-
private final OHTableFactory tableFactory;
52+
private String originTabelName = null;
53+
private final PoolMap<String, Table> tables;
54+
private final int maxSize;
55+
private final PoolMap.PoolType poolType;
56+
private final Configuration config;
57+
private final OHTableFactory tableFactory;
5858

5959
// A map of table attributes used for the table created by this pool. The map
6060
// key is composed of Table_Name + SEPARATOR + Attribute_Name, and the value
6161
// is byte value of attribute.
62-
private ConcurrentHashMap<String, byte[]> tableAttributes;
62+
private ConcurrentHashMap<String, byte[]> tableAttributes;
6363

64-
private ConcurrentHashMap<String, Object> tableExtendAttributes;
64+
private ConcurrentHashMap<String, Object> tableExtendAttributes;
6565

6666
/**
6767
* Default Constructor. Default HBaseConfiguration and no limit on pool size.

src/main/java/com/alipay/oceanbase/hbase/constants/OHConstants.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,7 @@ public final class OHConstants {
157157

158158
public static final long WRITE_BUFFER_SIZE_DEFAULT = 2097152L;
159159

160-
public static final int MAX_KEYVALUE_SIZE_DEFAULT = -1;
160+
public static final int MAX_KEYVALUE_SIZE_DEFAULT = 10485760;
161161

162162
public static final String SOCKET_TIMEOUT = "ipc.socket.timeout";
163163

src/main/java/com/alipay/oceanbase/hbase/filter/HBaseFilterUtils.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -355,7 +355,8 @@ private static void toParseableByteArray(ByteArrayOutputStream byteStream, Times
355355
}
356356

357357
// ColumnValueFilter('cf','q')
358-
private static void toParseableByteArray(ByteArrayOutputStream byteStream, ColumnValueFilter filter) throws IOException {
358+
private static void toParseableByteArray(ByteArrayOutputStream byteStream,
359+
ColumnValueFilter filter) throws IOException {
359360
byteStream.write(filter.getClass().getSimpleName().getBytes());
360361
byteStream.write('(');
361362
byteStream.write("'".getBytes());

0 commit comments

Comments
 (0)