Skip to content

Commit 31b2fb4

Browse files
JackShi148stuBirdFlymiyuan-ljr
authored
Develop tableBuilder (#92)
* add DepentdentFilter and SingleColumnValueExcludeFilter * add singleColumnValueExcludeFilter and DependentColumnFilter; add test cases and optimize bufferedMutator test cases * hbase support batch (#84) * Add DeleteFamilyVersion function and corresponding test cases (#85) * add DepentdentFilter and SingleColumnValueExcludeFilter * add singleColumnValueExcludeFilter and DependentColumnFilter; add test cases and optimize bufferedMutator test cases * add single cf timerange setting in Get * single cf setColumnFamilyTimeRange in Get and Scan * optimize code * add DeleteFamilyVersion and test cases * add DeleteFamilyVersion; optimize test cases * add DeleteFamilyVersion test case and pass * format code * delete useless self-defined table * remove DeleteFamilyVersion file and move all cases to MultiColumnFamilyTest * hbase support batchCallBack (#86) * adjust bufferdMutatorImpl 1.x to new batch * bufferedMutator do not retry, batch retry in table client * hbase support batch (#84) * add rpcTimeout and operationTimetout setting in bufferedMutator * fix test * Add DeleteFamilyVersion function and corresponding test cases (#85) * add DepentdentFilter and SingleColumnValueExcludeFilter * add singleColumnValueExcludeFilter and DependentColumnFilter; add test cases and optimize bufferedMutator test cases * add single cf timerange setting in Get * single cf setColumnFamilyTimeRange in Get and Scan * optimize code * add DeleteFamilyVersion and test cases * add DeleteFamilyVersion; optimize test cases * add DeleteFamilyVersion test case and pass * format code * delete useless self-defined table * hbase support batchCallBack (#86) * fix test * fix test * init hbase_2.0 bufferedMutatorImpl * pass single bufferedMutator test * remove useless comments * format code * add inherited interface in bufferedMutator; fix concurrent bug in bufferedMuator execution * develop tableBuilder * fix typo * fix exception erros message * update time to wait pool to shutdown * optimize rpcTimeout and operationTimeout setting in OHTable finishSetup * add import * pass test cases after merge * pass all test cases after merge * use temporary multiCfHTable so that does not influence other cases --------- Co-authored-by: stuBirdFly <84010733+stuBirdFly@users.noreply.github.com> Co-authored-by: miyuan-ljr <miyuan.ljr@antgroup.com>
1 parent fe4bf33 commit 31b2fb4

File tree

6 files changed

+309
-13
lines changed

6 files changed

+309
-13
lines changed

src/main/java/com/alipay/oceanbase/hbase/OHTable.java

Lines changed: 76 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,16 @@ public class OHTable implements Table {
104104
*/
105105
private int rpcTimeout;
106106

107+
/**
108+
* timeout for each read rpc request
109+
*/
110+
private int readRpcTimeout;
111+
112+
/**
113+
* timeout for each write rpc request
114+
*/
115+
private int writeRpcTimeout;
116+
107117
/**
108118
* if the <code>Get</code> executing pool is specified by user cleanupPoolOnClose will be false ,
109119
* which means that user is responsible for the pool
@@ -292,7 +302,7 @@ public OHTable(final byte[] tableName, final ObTableClient obTableClient,
292302
public OHTable(TableName tableName, Connection connection,
293303
OHConnectionConfiguration connectionConfig, ExecutorService executePool)
294304
throws IOException {
295-
checkArgument(connection.getConfiguration() != null, "configuration is null.");
305+
checkArgument(connection != null, "connection is null.");
296306
checkArgument(tableName != null, "tableName is null.");
297307
checkArgument(connection.getConfiguration() != null, "configuration is null.");
298308
checkArgument(tableName.getName() != null, "tableNameString is null.");
@@ -311,6 +321,8 @@ public OHTable(TableName tableName, Connection connection,
311321
this.cleanupPoolOnClose = false;
312322
}
313323
this.rpcTimeout = connectionConfig.getRpcTimeout();
324+
this.readRpcTimeout = connectionConfig.getReadRpcTimeout();
325+
this.writeRpcTimeout = connectionConfig.getWriteRpcTimeout();
314326
this.operationTimeout = connectionConfig.getOperationTimeout();
315327
this.operationExecuteInPool = this.configuration.getBoolean(
316328
HBASE_CLIENT_OPERATION_EXECUTE_IN_POOL,
@@ -331,6 +343,50 @@ public OHTable(TableName tableName, Connection connection,
331343
finishSetUp();
332344
}
333345

346+
public OHTable(Connection connection, ObTableBuilderBase builder,
347+
OHConnectionConfiguration connectionConfig, ExecutorService executePool)
348+
throws IOException {
349+
checkArgument(connection != null, "connection is null.");
350+
checkArgument(connection.getConfiguration() != null, "configuration is null.");
351+
checkArgument(builder != null, "builder is null");
352+
checkArgument(connectionConfig != null, "connectionConfig is null.");
353+
TableName builderTableName = builder.getTableName();
354+
this.tableName = builderTableName.getName();
355+
this.tableNameString = builderTableName.getNameAsString();
356+
this.configuration = connection.getConfiguration();
357+
this.executePool = executePool;
358+
if (executePool == null) {
359+
int maxThreads = configuration.getInt(HBASE_HTABLE_PRIVATE_THREADS_MAX,
360+
DEFAULT_HBASE_HTABLE_PRIVATE_THREADS_MAX);
361+
long keepAliveTime = configuration.getLong(HBASE_HTABLE_THREAD_KEEP_ALIVE_TIME,
362+
DEFAULT_HBASE_HTABLE_THREAD_KEEP_ALIVE_TIME);
363+
this.executePool = createDefaultThreadPoolExecutor(1, maxThreads, keepAliveTime);
364+
this.cleanupPoolOnClose = true;
365+
} else {
366+
this.cleanupPoolOnClose = false;
367+
}
368+
this.rpcTimeout = builder.getRpcTimeout();
369+
this.readRpcTimeout = builder.getReadRpcTimeout();
370+
this.writeRpcTimeout = builder.getWriteRpcTimeout();
371+
this.operationTimeout = builder.getOperationTimeout();
372+
this.operationExecuteInPool = this.configuration.getBoolean(
373+
HBASE_CLIENT_OPERATION_EXECUTE_IN_POOL,
374+
(this.operationTimeout != HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT));
375+
this.maxKeyValueSize = connectionConfig.getMaxKeyValueSize();
376+
this.putWriteBufferCheck = this.configuration.getInt(HBASE_HTABLE_PUT_WRITE_BUFFER_CHECK,
377+
DEFAULT_HBASE_HTABLE_PUT_WRITE_BUFFER_CHECK);
378+
this.writeBufferSize = connectionConfig.getWriteBufferSize();
379+
int numRetries = configuration.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
380+
HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
381+
this.obTableClient = ObTableClientManager.getOrCreateObTableClient(setUserDefinedNamespace(
382+
this.tableNameString, connectionConfig));
383+
this.obTableClient.setRpcExecuteTimeout(rpcTimeout);
384+
this.obTableClient.setRuntimeRetryTimes(numRetries);
385+
setOperationTimeout(operationTimeout);
386+
387+
finishSetUp();
388+
}
389+
334390
/**
335391
* 创建默认的线程池
336392
* Using the "direct handoff" approach, new threads will only be created
@@ -370,11 +426,15 @@ private void finishSetUp() {
370426
HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD,
371427
HConstants.HBASE_REGIONSERVER_LEASE_PERIOD_KEY,
372428
HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD);
373-
this.rpcTimeout = configuration.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
374-
HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
375-
this.operationTimeout = this.configuration.getInt(
429+
this.rpcTimeout = this.rpcTimeout <= 0 ? configuration.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
430+
HConstants.DEFAULT_HBASE_RPC_TIMEOUT) : this.rpcTimeout;
431+
this.readRpcTimeout = this.readRpcTimeout <= 0 ? configuration.getInt(HConstants.HBASE_RPC_READ_TIMEOUT_KEY,
432+
HConstants.DEFAULT_HBASE_RPC_TIMEOUT) : this.readRpcTimeout;
433+
this.writeRpcTimeout = this.writeRpcTimeout <= 0 ? configuration.getInt(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY,
434+
HConstants.DEFAULT_HBASE_RPC_TIMEOUT) : this.writeRpcTimeout;
435+
this.operationTimeout = this.operationTimeout <= 0 ? this.configuration.getInt(
376436
HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
377-
HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
437+
HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT) : this.operationTimeout;
378438
this.operationExecuteInPool = this.configuration.getBoolean(
379439
HBASE_CLIENT_OPERATION_EXECUTE_IN_POOL,
380440
(this.operationTimeout != HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT));
@@ -388,7 +448,7 @@ private void finishSetUp() {
388448

389449
private OHConnectionConfiguration setUserDefinedNamespace(String tableNameString,
390450
OHConnectionConfiguration ohConnectionConf)
391-
throws IOException {
451+
throws IllegalArgumentException {
392452
if (tableNameString.indexOf(':') != -1) {
393453
String[] params = tableNameString.split(":");
394454
if (params.length != 2) {
@@ -1320,6 +1380,16 @@ public int getRpcTimeout() {
13201380
return this.rpcTimeout;
13211381
}
13221382

1383+
@Override
1384+
public int getReadRpcTimeout() {
1385+
return this.readRpcTimeout;
1386+
}
1387+
1388+
@Override
1389+
public int getWriteRpcTimeout() {
1390+
return this.writeRpcTimeout;
1391+
}
1392+
13231393
public void setRuntimeBatchExecutor(ExecutorService runtimeBatchExecutor) {
13241394
this.obTableClient.setRuntimeBatchExecutor(runtimeBatchExecutor);
13251395
}

src/main/java/com/alipay/oceanbase/hbase/util/OHConnectionConfiguration.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,10 +47,13 @@ public class OHConnectionConfiguration {
4747
private final boolean odpMode;
4848
private final long writeBufferSize;
4949
private final int operationTimeout;
50+
private final int metaOperationTimeout;
5051
private final int scannerCaching;
5152
private final long scannerMaxResultSize;
5253
private final int maxKeyValueSize;
5354
private final int rpcTimeout;
55+
private final int readRpcTimeout;
56+
private final int writeRpcTimeout;
5457
private final int rpcConnectTimeout;
5558
private final long writeBufferPeriodicFlushTimeoutMs;
5659
private final long writeBufferPeriodicFlushTimerTickMs;
@@ -70,10 +73,16 @@ public OHConnectionConfiguration(Configuration conf) {
7073
}
7174
this.database = database;
7275
this.writeBufferSize = conf.getLong(WRITE_BUFFER_SIZE_KEY, WRITE_BUFFER_SIZE_DEFAULT);
76+
this.metaOperationTimeout = conf.getInt(HConstants.HBASE_CLIENT_META_OPERATION_TIMEOUT,
77+
HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
7378
this.operationTimeout = conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
7479
HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
7580
this.rpcTimeout = conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
7681
HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
82+
this.readRpcTimeout = conf.getInt(HConstants.HBASE_RPC_READ_TIMEOUT_KEY,
83+
HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
84+
this.writeRpcTimeout = conf.getInt(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY,
85+
HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
7786
this.writeBufferPeriodicFlushTimeoutMs = conf.getLong(
7887
WRITE_BUFFER_PERIODIC_FLUSH_TIMEOUT_MS, WRITE_BUFFER_PERIODIC_FLUSH_TIMEOUT_MS_DEFAULT);
7988
this.writeBufferPeriodicFlushTimerTickMs = conf.getLong(
@@ -117,6 +126,10 @@ public long getWriteBufferSize() {
117126
return this.writeBufferSize;
118127
}
119128

129+
public int getMetaOperationTimeout() {
130+
return this.metaOperationTimeout;
131+
}
132+
120133
public int getOperationTimeout() {
121134
return this.operationTimeout;
122135
}
@@ -133,6 +146,14 @@ public int getRpcTimeout() {
133146
return this.rpcTimeout;
134147
}
135148

149+
public int getReadRpcTimeout() {
150+
return this.readRpcTimeout;
151+
}
152+
153+
public int getWriteRpcTimeout() {
154+
return this.writeRpcTimeout;
155+
}
156+
136157
public int getRpcConnectTimeout() {
137158
return this.rpcConnectTimeout;
138159
}

src/main/java/com/alipay/oceanbase/hbase/util/OHConnectionImpl.java

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -176,8 +176,19 @@ public boolean isClosed() {
176176
}
177177

178178
@Override
179-
public TableBuilder getTableBuilder(TableName tableName, ExecutorService executorService) {
180-
throw new FeatureNotSupportedException("not supported yet'");
179+
public TableBuilder getTableBuilder(TableName tableName, ExecutorService pool) {
180+
return new ObTableBuilderBase(tableName, connectionConfig) {
181+
@Override
182+
public Table build() {
183+
try {
184+
return new OHTable(OHConnectionImpl.this, this,
185+
OHConnectionImpl.this.connectionConfig, pool);
186+
} catch (Exception e) {
187+
LOGGER.error("Fail to build new OHTable", e);
188+
throw new RuntimeException(e);
189+
}
190+
}
191+
};
181192
}
182193

183194
@Override
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
package com.alipay.oceanbase.hbase.util;
2+
3+
import org.apache.hadoop.hbase.TableName;
4+
import org.apache.hadoop.hbase.client.TableBuilder;
5+
import org.apache.yetus.audience.InterfaceAudience;
6+
7+
@InterfaceAudience.Private
8+
abstract public class ObTableBuilderBase implements TableBuilder {
9+
protected TableName tableName;
10+
11+
protected int operationTimeout;
12+
13+
protected int rpcTimeout;
14+
15+
protected int readRpcTimeout;
16+
17+
protected int writeRpcTimeout;
18+
19+
ObTableBuilderBase(TableName tableName, OHConnectionConfiguration ohConnConf) {
20+
if (tableName == null) {
21+
throw new IllegalArgumentException("The provided tableName is null");
22+
}
23+
this.tableName = tableName;
24+
this.operationTimeout = tableName.isSystemTable() ? ohConnConf.getMetaOperationTimeout()
25+
: ohConnConf.getOperationTimeout();
26+
this.rpcTimeout = ohConnConf.getRpcTimeout();
27+
this.readRpcTimeout = ohConnConf.getReadRpcTimeout();
28+
this.writeRpcTimeout = ohConnConf.getWriteRpcTimeout();
29+
}
30+
31+
@Override
32+
public ObTableBuilderBase setOperationTimeout(int timeout) {
33+
this.operationTimeout = timeout;
34+
return this;
35+
}
36+
37+
@Override
38+
public ObTableBuilderBase setRpcTimeout(int timeout) {
39+
this.rpcTimeout = timeout;
40+
return this;
41+
}
42+
43+
@Override
44+
public ObTableBuilderBase setReadRpcTimeout(int timeout) {
45+
this.readRpcTimeout = timeout;
46+
return this;
47+
}
48+
49+
@Override
50+
public ObTableBuilderBase setWriteRpcTimeout(int timeout) {
51+
this.writeRpcTimeout = timeout;
52+
return this;
53+
}
54+
55+
public TableName getTableName() {
56+
return this.tableName;
57+
}
58+
59+
public int getOperationTimeout() {
60+
return this.operationTimeout;
61+
}
62+
63+
public int getRpcTimeout() {
64+
return this.rpcTimeout;
65+
}
66+
67+
public int getReadRpcTimeout() {
68+
return this.readRpcTimeout;
69+
}
70+
71+
public int getWriteRpcTimeout() {
72+
return this.writeRpcTimeout;
73+
}
74+
}

src/test/java/com/alipay/oceanbase/hbase/HTableMultiCFTestBase.java

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,10 @@
3131
import org.junit.*;
3232
import org.junit.rules.ExpectedException;
3333

34+
import java.io.IOException;
3435
import java.util.*;
3536

37+
import static org.apache.hadoop.hbase.ipc.RpcClient.SOCKET_TIMEOUT_CONNECT;
3638
import static org.apache.hadoop.hbase.util.Bytes.toBytes;
3739
import static org.junit.Assert.*;
3840

@@ -363,6 +365,80 @@ public void testDeleteFamilyVerison() throws Exception {
363365
multiCfHTable.delete(deleteKey3Family);
364366
}
365367

368+
@Test
369+
public void testMultiColumnFamilyTableBuilder() throws Exception {
370+
byte[] family1 = "family_with_group1".getBytes();
371+
byte[] family2 = "family_with_group2".getBytes();
372+
byte[] family3 = "family_with_group3".getBytes();
373+
374+
byte[] family1_column1 = "family1_column1".getBytes();
375+
byte[] family1_column2 = "family1_column2".getBytes();
376+
byte[] family1_column3 = "family1_column3".getBytes();
377+
byte[] family2_column1 = "family2_column1".getBytes();
378+
byte[] family2_column2 = "family2_column2".getBytes();
379+
byte[] family3_column1 = "family3_column1".getBytes();
380+
byte[] family1_value = "VVV1".getBytes();
381+
byte[] family2_value = "VVV2".getBytes();
382+
byte[] family3_value = "VVV3".getBytes();
383+
384+
Configuration conf = ObHTableTestUtil.newConfiguration();
385+
conf.set("rs.list.acquire.read.timeout", "10000");
386+
conf.set(SOCKET_TIMEOUT_CONNECT, "15000");
387+
Connection connection = ConnectionFactory.createConnection(conf);
388+
TableName tableName = TableName.valueOf("test_multi_cf");
389+
TableBuilder builder = connection.getTableBuilder(tableName, null);
390+
// build a OHTable with default params
391+
Table tmpMultiCfHTable = builder.build();
392+
393+
Delete delete = new Delete(toBytes("Key0"));
394+
delete.addFamily(family1);
395+
delete.addFamily(family2);
396+
delete.addFamily(family3);
397+
tmpMultiCfHTable.delete(delete);
398+
399+
Put put = new Put(toBytes("Key0"));
400+
put.addColumn(family1, family1_column1, family1_value);
401+
put.addColumn(family1, family1_column2, family1_value);
402+
put.addColumn(family1, family1_column3, family1_value);
403+
put.addColumn(family2, family2_column1, family2_value);
404+
put.addColumn(family2, family2_column2, family2_value);
405+
put.addColumn(family3, family3_column1, family3_value);
406+
tmpMultiCfHTable.put(put);
407+
408+
int count = 0;
409+
Get get = new Get(toBytes("Key0"));
410+
get.setMaxVersions();
411+
Result r = tmpMultiCfHTable.get(get);
412+
Assert.assertEquals(6, r.rawCells().length);
413+
414+
delete = new Delete(toBytes("Key0"));
415+
delete.addFamily(family1);
416+
delete.addFamily(family2);
417+
delete.addFamily(family3);
418+
tmpMultiCfHTable.delete(delete);
419+
r = tmpMultiCfHTable.get(get);
420+
Assert.assertEquals(0, r.rawCells().length);
421+
422+
// set params for TableBuilder
423+
builder.setOperationTimeout(1500000);
424+
builder.setRpcTimeout(40000);
425+
tmpMultiCfHTable = builder.build();
426+
427+
put = new Put(toBytes("Key0"));
428+
put.addColumn(family1, family1_column1, family1_value);
429+
put.addColumn(family1, family1_column2, family1_value);
430+
put.addColumn(family2, family2_column1, family2_value);
431+
put.addColumn(family3, family3_column1, family3_value);
432+
tmpMultiCfHTable.put(put);
433+
434+
r = tmpMultiCfHTable.get(get);
435+
Assert.assertEquals(4, r.rawCells().length);
436+
437+
tmpMultiCfHTable.delete(delete);
438+
r = tmpMultiCfHTable.get(get);
439+
Assert.assertEquals(0, r.rawCells().length);
440+
}
441+
366442
@Test
367443
public void testMultiColumnFamilyBufferedMutator() throws Exception {
368444
byte[] family1 = "family_with_group1".getBytes();

0 commit comments

Comments
 (0)