Skip to content

Commit 24bc2e4

Browse files
JackShi148stuBirdFlymiyuan-ljr
authored
CheckAndMutateBuilder and CheckAndMutate interface (#95)
* add DepentdentFilter and SingleColumnValueExcludeFilter * add singleColumnValueExcludeFilter and DependentColumnFilter; add test cases and optimize bufferedMutator test cases * hbase support batch (#84) * Add DeleteFamilyVersion function and corresponding test cases (#85) * add DepentdentFilter and SingleColumnValueExcludeFilter * add singleColumnValueExcludeFilter and DependentColumnFilter; add test cases and optimize bufferedMutator test cases * add single cf timerange setting in Get * single cf setColumnFamilyTimeRange in Get and Scan * optimize code * add DeleteFamilyVersion and test cases * add DeleteFamilyVersion; optimize test cases * add DeleteFamilyVersion test case and pass * format code * delete useless self-defined table * remove DeleteFamilyVersion file and move all cases to MultiColumnFamilyTest * hbase support batchCallBack (#86) * adjust bufferdMutatorImpl 1.x to new batch * bufferedMutator do not retry, batch retry in table client * hbase support batch (#84) * add rpcTimeout and operationTimetout setting in bufferedMutator * fix test * Add DeleteFamilyVersion function and corresponding test cases (#85) * add DepentdentFilter and SingleColumnValueExcludeFilter * add singleColumnValueExcludeFilter and DependentColumnFilter; add test cases and optimize bufferedMutator test cases * add single cf timerange setting in Get * single cf setColumnFamilyTimeRange in Get and Scan * optimize code * add DeleteFamilyVersion and test cases * add DeleteFamilyVersion; optimize test cases * add DeleteFamilyVersion test case and pass * format code * delete useless self-defined table * hbase support batchCallBack (#86) * fix test * fix test * init hbase_2.0 bufferedMutatorImpl * pass single bufferedMutator test * remove useless comments * format code * add inherited interface in bufferedMutator; fix concurrent bug in bufferedMuator execution * develop tableBuilder * fix typo * fix exception erros message * update time to wait pool to shutdown * optimize rpcTimeout and operationTimeout setting in OHTable finishSetup * develop ObCheckAndMutateBuilder * add test cases * add import * pass test cases after merge * pass all test cases after merge * remove useless import * use temporary multiCfHTable so that does not influence other cases --------- Co-authored-by: stuBirdFly <84010733+stuBirdFly@users.noreply.github.com> Co-authored-by: miyuan-ljr <miyuan.ljr@antgroup.com>
1 parent 31b2fb4 commit 24bc2e4

File tree

7 files changed

+452
-11
lines changed

7 files changed

+452
-11
lines changed

src/main/java/com/alipay/oceanbase/hbase/OHTable.java

Lines changed: 117 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@
6363

6464
import static com.alipay.oceanbase.hbase.constants.OHConstants.*;
6565
import static com.alipay.oceanbase.hbase.util.Preconditions.checkArgument;
66+
import static com.alipay.oceanbase.hbase.util.Preconditions.checkNotNull;
6667
import static com.alipay.oceanbase.hbase.util.TableHBaseLoggerFactory.LCD;
6768
import static com.alipay.oceanbase.hbase.util.TableHBaseLoggerFactory.TABLE_HBASE_LOGGER_SPACE;
6869
import static com.alipay.oceanbase.rpc.protocol.payload.impl.execute.ObTableOperation.getInstance;
@@ -963,7 +964,7 @@ public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier,
963964
RowMutations rowMutations = new RowMutations(row);
964965
rowMutations.add(put);
965966
try {
966-
return checkAndMutation(row, family, qualifier, compareOp, value, rowMutations);
967+
return checkAndMutation(row, family, qualifier, compareOp, value, null, rowMutations);
967968
} catch (Exception e) {
968969
logger.error(LCD.convert("01-00005"), put, tableNameString, e);
969970
throw new IOException("checkAndPut type table:" + tableNameString + " e.msg:"
@@ -1019,7 +1020,7 @@ public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier,
10191020
RowMutations rowMutations = new RowMutations(row);
10201021
rowMutations.add(delete);
10211022
try {
1022-
return checkAndMutation(row, family, qualifier, compareOp, value, rowMutations);
1023+
return checkAndMutation(row, family, qualifier, compareOp, value, null, rowMutations);
10231024
} catch (Exception e) {
10241025
logger.error(LCD.convert("01-00005"), delete, tableNameString, e);
10251026
throw new IOException("checkAndDelete type table:" + tableNameString + " e.msg:"
@@ -1032,26 +1033,29 @@ public boolean checkAndMutate(byte[] row, byte[] family, byte[] qualifier,
10321033
CompareFilter.CompareOp compareOp, byte[] value,
10331034
RowMutations rowMutations) throws IOException {
10341035
try {
1035-
return checkAndMutation(row, family, qualifier, compareOp, value, rowMutations);
1036+
return checkAndMutation(row, family, qualifier, compareOp, value, null, rowMutations);
10361037
} catch (Exception e) {
10371038
logger.error(LCD.convert("01-00005"), rowMutations, tableNameString, e);
10381039
throw new IOException("checkAndMutate type table:" + tableNameString + " e.msg:"
10391040
+ e.getMessage() + " error.", e);
10401041
}
10411042
}
10421043

1044+
@Override
1045+
public CheckAndMutateBuilder checkAndMutate(byte[]row , byte[] family) {
1046+
return new ObCheckAndMutateBuilderImpl(row, family);
1047+
}
1048+
10431049
private boolean checkAndMutation(byte[] row, byte[] family, byte[] qualifier, CompareFilter.CompareOp compareOp, byte[] value,
1044-
RowMutations rowMutations) throws Exception {
1050+
TimeRange timeRange, RowMutations rowMutations) throws Exception {
10451051
checkArgument(row != null, "row is null");
10461052
checkArgument(isNotBlank(Bytes.toString(family)), "family is blank");
10471053
checkArgument(Bytes.equals(row, rowMutations.getRow()),
1048-
"mutation row is not equal check row");
1049-
1054+
"mutation row is not equal check row");
10501055
checkArgument(!rowMutations.getMutations().isEmpty(), "mutation is empty");
1051-
10521056
byte[] filterString = buildCheckAndMutateFilterString(family, qualifier, compareOp, value);
10531057

1054-
ObHTableFilter filter = buildObHTableFilter(filterString, null, 1, qualifier);
1058+
ObHTableFilter filter = buildObHTableFilter(filterString, timeRange, 1, qualifier);
10551059
List<Mutation> mutations = rowMutations.getMutations();
10561060
List<Cell> keyValueList = new LinkedList<>();
10571061
// only one family operation is allowed
@@ -1911,4 +1915,109 @@ public static OHOpType getDeleteType(Cell.Type type) {
19111915
throw new IllegalArgumentException("illegal mutation type " + type);
19121916
}
19131917
}
1918+
1919+
private class ObCheckAndMutateBuilderImpl implements CheckAndMutateBuilder {
1920+
private final byte[] row;
1921+
private final byte[] family;
1922+
private byte[] qualifier;
1923+
private byte[] value;
1924+
private TimeRange timeRange;
1925+
private CompareOperator cmpOp;
1926+
1927+
ObCheckAndMutateBuilderImpl(byte[] row, byte[] family) {
1928+
this.row = checkNotNull(row, "The provided row is null.");
1929+
this.family = checkNotNull(family, "The provided family is null.");
1930+
}
1931+
1932+
@Override
1933+
public CheckAndMutateBuilder qualifier(byte[] qualifier) {
1934+
this.qualifier = checkNotNull(qualifier, "The provided qualifier is null. You could" +
1935+
" use an empty byte array, or do not call this method if you want a null qualifier.");
1936+
return this;
1937+
}
1938+
1939+
@Override
1940+
public CheckAndMutateBuilder timeRange(TimeRange timeRange) {
1941+
this.timeRange = timeRange;
1942+
return this;
1943+
}
1944+
1945+
@Override
1946+
public CheckAndMutateBuilder ifNotExists() {
1947+
this.cmpOp = CompareOperator.EQUAL;
1948+
this.value = null;
1949+
return this;
1950+
}
1951+
1952+
@Override
1953+
public CheckAndMutateBuilder ifMatches(CompareOperator cmpOp, byte[] value) {
1954+
this.cmpOp = checkNotNull(cmpOp, "The provided cmpOp is null.");
1955+
this.value = checkNotNull(value , "The provided value is null.");
1956+
return this;
1957+
}
1958+
1959+
@Override
1960+
public boolean thenPut(Put put) throws IOException {
1961+
checkCmpOp();
1962+
RowMutations rowMutations = new RowMutations(row);
1963+
rowMutations.add(put);
1964+
try {
1965+
return checkAndMutation(row, family, qualifier, getCompareOp(cmpOp), value, timeRange, rowMutations);
1966+
} catch(Exception e) {
1967+
logger.error(LCD.convert("01-00005"), rowMutations, tableNameString, e);
1968+
throw new IOException("checkAndMutate type table: " + tableNameString + " e.msg: "
1969+
+ e.getMessage() + " error.", e);
1970+
}
1971+
}
1972+
1973+
@Override
1974+
public boolean thenDelete(Delete delete) throws IOException {
1975+
checkCmpOp();
1976+
RowMutations rowMutations = new RowMutations(row);
1977+
rowMutations.add(delete);
1978+
try {
1979+
return checkAndMutation(row, family, qualifier, getCompareOp(cmpOp), value, timeRange, rowMutations);
1980+
} catch(Exception e) {
1981+
logger.error(LCD.convert("01-00005"), rowMutations, tableNameString, e);
1982+
throw new IOException("checkAndMutate type table: " + tableNameString + " e.msg: "
1983+
+ e.getMessage() + " error.", e);
1984+
}
1985+
}
1986+
1987+
@Override
1988+
public boolean thenMutate(RowMutations mutation) throws IOException {
1989+
checkCmpOp();
1990+
try {
1991+
return checkAndMutation(row, family, qualifier, getCompareOp(cmpOp), value, timeRange, mutation);
1992+
} catch(Exception e) {
1993+
logger.error(LCD.convert("01-00005"), mutation, tableNameString, e);
1994+
throw new IOException("checkAndMutate type table: " + tableNameString + " e.msg: "
1995+
+ e.getMessage() + " error.", e);
1996+
}
1997+
}
1998+
1999+
private void checkCmpOp() {
2000+
checkNotNull(this.cmpOp, "The compare condition is null. Please use"
2001+
+ " ifNotExists/ifEquals/ifMatches before executing the request");
2002+
}
2003+
2004+
private CompareFilter.CompareOp getCompareOp(CompareOperator cmpOp) {
2005+
switch (cmpOp) {
2006+
case LESS:
2007+
return CompareFilter.CompareOp.LESS;
2008+
case LESS_OR_EQUAL:
2009+
return CompareFilter.CompareOp.LESS_OR_EQUAL;
2010+
case EQUAL:
2011+
return CompareFilter.CompareOp.EQUAL;
2012+
case NOT_EQUAL:
2013+
return CompareFilter.CompareOp.NOT_EQUAL;
2014+
case GREATER_OR_EQUAL:
2015+
return CompareFilter.CompareOp.GREATER_OR_EQUAL;
2016+
case GREATER:
2017+
return CompareFilter.CompareOp.GREATER;
2018+
default:
2019+
return CompareFilter.CompareOp.NO_OP;
2020+
}
2021+
}
2022+
}
19142023
}

src/main/java/com/alipay/oceanbase/hbase/OHTableClient.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,12 @@ public boolean checkAndMutate(byte[] row, byte[] family, byte[] qualifier,
173173
return ohTable.checkAndMutate(row, family, qualifier, compareOp, value, mutations);
174174
}
175175

176+
@Override
177+
public CheckAndMutateBuilder checkAndMutate(byte[] row, byte[] family) {
178+
checkStatus();
179+
return ohTable.checkAndMutate(row, family);
180+
}
181+
176182
@Override
177183
public void setOperationTimeout(int i) {
178184
checkStatus();

src/main/java/com/alipay/oceanbase/hbase/OHTablePool.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -826,6 +826,11 @@ public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier,
826826
return table.checkAndDelete(row, family, qualifier, compareOp, value, delete);
827827
}
828828

829+
@Override
830+
public CheckAndMutateBuilder checkAndMutate(byte[] row, byte[] family) {
831+
return table.checkAndMutate(row, family);
832+
}
833+
829834
@Override
830835
public Result increment(Increment increment) throws IOException {
831836
return table.increment(increment);

src/main/java/com/alipay/oceanbase/hbase/util/ObTableClientManager.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,6 @@
2121
import com.alipay.oceanbase.rpc.constant.Constants;
2222
import com.google.common.base.Objects;
2323
import org.apache.hadoop.classification.InterfaceAudience;
24-
import org.apache.hadoop.conf.Configuration;
25-
import org.apache.hadoop.hbase.client.ConnectionConfiguration;
2624

2725
import java.io.IOException;
2826
import java.util.Map;

src/test/java/com/alipay/oceanbase/hbase/HTableMultiCFTestBase.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,9 @@
1818
package com.alipay.oceanbase.hbase;
1919

2020
import com.alipay.oceanbase.hbase.util.OHBufferedMutatorImpl;
21-
import com.alipay.oceanbase.hbase.util.ObHTableTestUtil;
2221
import org.apache.hadoop.conf.Configuration;
2322
import com.alipay.oceanbase.rpc.mutation.result.MutationResult;
23+
import com.alipay.oceanbase.hbase.util.ObHTableTestUtil;
2424
import org.apache.hadoop.hbase.Cell;
2525
import org.apache.hadoop.hbase.CellUtil;
2626
import org.apache.hadoop.hbase.TableName;

0 commit comments

Comments
 (0)