1818package com .alipay .oceanbase .hbase .util ;
1919
2020import com .alipay .oceanbase .hbase .OHTable ;
21- import com .google .common .annotations .VisibleForTesting ;
21+ import com .alipay .oceanbase .rpc .exception .ObTableUnexpectedException ;
22+ import com .alipay .oceanbase .rpc .protocol .payload .impl .execute .ObTableBatchOperation ;
2223import org .apache .hadoop .classification .InterfaceAudience ;
2324import org .apache .hadoop .conf .Configuration ;
25+ import org .apache .hadoop .hbase .KeyValue ;
2426import org .apache .hadoop .hbase .TableName ;
2527import org .apache .hadoop .hbase .client .*;
2628import org .slf4j .Logger ;
3133import java .util .concurrent .ExecutorService ;
3234import java .util .concurrent .TimeUnit ;
3335import java .util .concurrent .atomic .AtomicLong ;
34- import static com .alipay .oceanbase .rpc .util .TableClientLoggerFactory .LCD ;
36+
37+ import static com .alipay .oceanbase .hbase .util .TableHBaseLoggerFactory .LCD ;
38+ import static com .alipay .oceanbase .rpc .ObGlobal .*;
3539
3640@ InterfaceAudience .Private
3741public class OHBufferedMutatorImpl implements BufferedMutator {
@@ -40,13 +44,11 @@ public class OHBufferedMutatorImpl implements BufferedMutator {
4044
4145 private final ExceptionListener listener ;
4246
43- private final OHTable ohTable ;
4447 private final TableName tableName ;
4548 private volatile Configuration conf ;
4649
47- @ VisibleForTesting
50+ private OHTable ohTable ;
4851 final ConcurrentLinkedQueue <Mutation > asyncWriteBuffer = new ConcurrentLinkedQueue <Mutation >();
49- @ VisibleForTesting
5052 AtomicLong currentAsyncBufferSize = new AtomicLong (0 );
5153
5254 private long writeBufferSize ;
@@ -55,9 +57,11 @@ public class OHBufferedMutatorImpl implements BufferedMutator {
5557 private final ExecutorService pool ;
5658 private int rpcTimeout ;
5759 private int operationTimeout ;
60+ private static final long OB_VERSION_4_2_5_1 = calcVersion (4 , (short ) 2 ,
61+ (byte ) 5 , (byte ) 1 );
5862
59- public OHBufferedMutatorImpl (OHConnectionImpl ohConnection , BufferedMutatorParams params )
60- throws IOException {
63+ public OHBufferedMutatorImpl (OHConnectionImpl ohConnection , BufferedMutatorParams params ,
64+ OHTable ohTable ) throws IOException {
6165 if (ohConnection == null || ohConnection .isClosed ()) {
6266 throw new IllegalArgumentException ("Connection is null or closed." );
6367 }
@@ -77,7 +81,38 @@ public OHBufferedMutatorImpl(OHConnectionImpl ohConnection, BufferedMutatorParam
7781 .getMaxKeyValueSize () : connectionConfig .getMaxKeyValueSize ();
7882
7983 // create an OHTable object to do batch work
80- this .ohTable = new OHTable (tableName , ohConnection , connectionConfig , pool );
84+ if (ohTable != null ) {
85+ this .ohTable = ohTable ;
86+ } else {
87+ this .ohTable = new OHTable (tableName , ohConnection , connectionConfig , pool );
88+ }
89+ }
90+
91+ /**
92+ * only used for OHTable get bufferedMutator
93+ * */
94+ public OHBufferedMutatorImpl (Configuration conf , BufferedMutatorParams params , OHTable ohTable )
95+ throws IOException {
96+ // create an OHTable object to do batch work
97+ if (ohTable == null ) {
98+ throw new ObTableUnexpectedException ("The ohTable is null." );
99+ }
100+ this .ohTable = ohTable ;
101+ // init params in OHBufferedMutatorImpl
102+ this .tableName = params .getTableName ();
103+ this .conf = conf ;
104+ this .listener = params .getListener ();
105+
106+ OHConnectionConfiguration connectionConfig = new OHConnectionConfiguration (conf );
107+ this .pool = params .getPool ();
108+ this .rpcTimeout = connectionConfig .getRpcTimeout ();
109+ this .operationTimeout = connectionConfig .getOperationTimeout ();
110+
111+ this .writeBufferSize = params .getWriteBufferSize () != OHConnectionImpl .BUFFERED_PARAM_UNSET ? params
112+ .getWriteBufferSize () : connectionConfig .getWriteBufferSize ();
113+ this .maxKeyValueSize = params .getMaxKeyValueSize () != OHConnectionImpl .BUFFERED_PARAM_UNSET ? params
114+ .getMaxKeyValueSize () : connectionConfig .getMaxKeyValueSize ();
115+
81116 }
82117
83118 @ Override
@@ -119,14 +154,12 @@ public void mutate(List<? extends Mutation> mutations) throws IOException {
119154 validateOperation (m );
120155 toAddSize += m .heapSize ();
121156 }
122-
123157 currentAsyncBufferSize .addAndGet (toAddSize );
124158 asyncWriteBuffer .addAll (mutations );
125159
126160 if (currentAsyncBufferSize .get () > writeBufferSize ) {
127- execute (false );
161+ batchExecute (false );
128162 }
129-
130163 }
131164
132165 /**
@@ -142,10 +175,18 @@ private void validateOperation(Mutation mt) throws IllegalArgumentException {
142175 }
143176 if (mt instanceof Put ) {
144177 // family empty check is in validatePut
145- HTable .validatePut ((Put ) mt , maxKeyValueSize );
146- OHTable .checkFamilyViolation (mt .getFamilyMap ().keySet (), true );
178+ OHTable .validatePut ((Put ) mt , maxKeyValueSize );
179+ if (isMultiFamilySupport ()) {
180+ OHTable .checkFamilyViolation (mt .getFamilyMap ().keySet (), true );
181+ } else {
182+ OHTable .checkFamilyViolationForOneFamily (mt .getFamilyMap ().keySet ());
183+ }
147184 } else {
148- OHTable .checkFamilyViolation (mt .getFamilyMap ().keySet (), false );
185+ if (isMultiFamilySupport ()) {
186+ OHTable .checkFamilyViolation (mt .getFamilyMap ().keySet (), false );
187+ } else {
188+ OHTable .checkFamilyViolationForOneFamily (mt .getFamilyMap ().keySet ());
189+ }
149190 }
150191 }
151192
@@ -156,7 +197,7 @@ private void validateOperation(Mutation mt) throws IllegalArgumentException {
156197 * @param flushAll - if true, sends all the writes and wait for all of them to finish before
157198 * returning.
158199 */
159- private void execute (boolean flushAll ) throws IOException {
200+ private void batchExecute (boolean flushAll ) throws IOException {
160201 LinkedList <Mutation > execBuffer = new LinkedList <>();
161202 long dequeuedSize = 0L ;
162203 try {
@@ -172,19 +213,16 @@ private void execute(boolean flushAll) throws IOException {
172213 if (execBuffer .isEmpty ()) {
173214 return ;
174215 }
175- ohTable .batch (execBuffer );
216+ Object [] results = new Object [execBuffer .size ()];
217+ ohTable .batch (execBuffer , results );
176218 // if commit all successfully, clean execBuffer
177219 execBuffer .clear ();
178220 } catch (Exception ex ) {
179- LOGGER .error (LCD .convert ("01-00026" ), ex );
221+ // do not recollect error operations, notify outside
222+ LOGGER .error ("error happens: table name = " , tableName .getNameAsString (), ex );
180223 if (ex .getCause () instanceof RetriesExhaustedWithDetailsException ) {
181- LOGGER .error (tableName + ": One or more of the operations have failed after retries." );
224+ LOGGER .error (tableName . getNameAsString () + ": One or more of the operations have failed after retries." , ex );
182225 RetriesExhaustedWithDetailsException retryException = (RetriesExhaustedWithDetailsException ) ex .getCause ();
183- // recollect mutations
184- execBuffer .clear ();
185- for (int i = 0 ; i < retryException .getNumExceptions (); ++i ) {
186- execBuffer .add ((Mutation ) retryException .getRow (i ));
187- }
188226 if (listener != null ) {
189227 listener .onException (retryException , this );
190228 } else {
@@ -194,12 +232,6 @@ private void execute(boolean flushAll) throws IOException {
194232 LOGGER .error ("Errors unrelated to operations occur during mutation operation" , ex );
195233 throw ex ;
196234 }
197- } finally {
198- for (Mutation mutation : execBuffer ) {
199- long size = mutation .heapSize ();
200- currentAsyncBufferSize .addAndGet (size );
201- asyncWriteBuffer .add (mutation );
202- }
203235 }
204236 }
205237
@@ -209,7 +241,7 @@ public void close() throws IOException {
209241 return ;
210242 }
211243 try {
212- execute (true );
244+ batchExecute (true );
213245 } finally {
214246 // the pool in ObTableClient will be shut down too
215247 this .pool .shutdown ();
@@ -234,13 +266,21 @@ public void setWriteBufferSize(long writeBufferSize) throws IOException {
234266 }
235267 }
236268
269+ /**
270+ * Only 4_2_5 BP1 - 4_3_0 and after 4_3_4 support multi-cf
271+ * */
272+ boolean isMultiFamilySupport () {
273+ return (OB_VERSION >= OB_VERSION_4_2_5_1 && OB_VERSION < OB_VERSION_4_3_0_0 )
274+ || (OB_VERSION >= OB_VERSION_4_3_4_0 );
275+ }
276+
237277 /**
238278 * Force to commit all operations
239279 * do not care whether the pool is shut down or this BufferedMutator is closed
240280 */
241281 @ Override
242282 public void flush () throws IOException {
243- execute (true );
283+ batchExecute (true );
244284 }
245285
246286 @ Override
@@ -258,6 +298,10 @@ public void setOperationTimeout(int operationTimeout) {
258298 this .ohTable .setOperationTimeout (operationTimeout );
259299 }
260300
301+ public long getCurrentBufferSize () {
302+ return currentAsyncBufferSize .get ();
303+ }
304+
261305 @ Deprecated
262306 public List <Row > getWriteBuffer () {
263307 return Arrays .asList (asyncWriteBuffer .toArray (new Row [0 ]));
0 commit comments