1818package com .alipay .oceanbase .hbase .util ;
1919
2020import com .alipay .oceanbase .hbase .OHTable ;
21- import com .alipay .oceanbase .rpc .ObTableClient ;
22- import com .alipay .oceanbase .rpc .protocol .payload .impl .execute .*;
21+ import com .google .common .annotations .VisibleForTesting ;
2322import org .apache .hadoop .classification .InterfaceAudience ;
2423import org .apache .hadoop .conf .Configuration ;
25- import org .apache .hadoop .hbase .KeyValue ;
2624import org .apache .hadoop .hbase .TableName ;
2725import org .apache .hadoop .hbase .client .*;
28- import org .apache .hadoop .hbase .util .Bytes ;
2926import org .slf4j .Logger ;
3027
3128import java .io .IOException ;
3431import java .util .concurrent .ExecutorService ;
3532import java .util .concurrent .TimeUnit ;
3633import java .util .concurrent .atomic .AtomicLong ;
37- import java .util .concurrent .atomic .AtomicReference ;
3834import static com .alipay .oceanbase .rpc .util .TableClientLoggerFactory .LCD ;
3935
4036@ InterfaceAudience .Private
4137public class OHBufferedMutatorImpl implements BufferedMutator {
42- private static final Logger LOGGER = TableHBaseLoggerFactory
43- .getLogger (OHBufferedMutatorImpl .class );
38+ private static final Logger LOGGER = TableHBaseLoggerFactory
39+ .getLogger (OHBufferedMutatorImpl .class );
4440
45- private final ExceptionListener listener ;
41+ private final ExceptionListener listener ;
4642
47- protected final ObTableClient obTableClient ;
48- private final TableName tableName ;
49- private volatile Configuration conf ;
50- private final OHConnectionConfiguration connectionConfig ;
43+ private final OHTable ohTable ;
44+ private final TableName tableName ;
45+ private volatile Configuration conf ;
5146
52- final ConcurrentLinkedQueue <Mutation > asyncWriteBuffer = new ConcurrentLinkedQueue <Mutation >();
53- AtomicLong currentAsyncBufferSize = new AtomicLong (0 );
47+ @ VisibleForTesting
48+ final ConcurrentLinkedQueue <Mutation > asyncWriteBuffer = new ConcurrentLinkedQueue <Mutation >();
49+ @ VisibleForTesting
50+ AtomicLong currentAsyncBufferSize = new AtomicLong (0 );
5451
55- private AtomicReference < Class <?>> type = new AtomicReference <>( null ) ;
56- private final long writeBufferSize ;
57- private final int maxKeyValueSize ;
58- private boolean closed = false ;
59- private final ExecutorService pool ;
60- private final int rpcTimeout ;
52+ private long writeBufferSize ;
53+ private final int maxKeyValueSize ;
54+ private boolean closed = false ;
55+ private final ExecutorService pool ;
56+ private int rpcTimeout ;
57+ private int operationTimeout ;
6158
6259 public OHBufferedMutatorImpl (OHConnectionImpl ohConnection , BufferedMutatorParams params )
6360 throws IOException {
6461 if (ohConnection == null || ohConnection .isClosed ()) {
6562 throw new IllegalArgumentException ("Connection is null or closed." );
6663 }
67- // create a ObTableClient to do rpc operations
68- this .obTableClient = ObTableClientManager .getOrCreateObTableClient (ohConnection
69- .getOHConnectionConfiguration ());
70-
7164 // init params in OHBufferedMutatorImpl
7265 this .tableName = params .getTableName ();
7366 this .conf = ohConnection .getConfiguration ();
74- this .connectionConfig = ohConnection .getOHConnectionConfiguration ();
7567 this .listener = params .getListener ();
68+
69+ OHConnectionConfiguration connectionConfig = ohConnection .getOHConnectionConfiguration ();
7670 this .pool = params .getPool ();
77- this .obTableClient .setRuntimeBatchExecutor (pool );
71+ this .rpcTimeout = connectionConfig .getRpcTimeout ();
72+ this .operationTimeout = connectionConfig .getOperationTimeout ();
7873
7974 this .writeBufferSize = params .getWriteBufferSize () != OHConnectionImpl .BUFFERED_PARAM_UNSET ? params
8075 .getWriteBufferSize () : connectionConfig .getWriteBufferSize ();
8176 this .maxKeyValueSize = params .getMaxKeyValueSize () != OHConnectionImpl .BUFFERED_PARAM_UNSET ? params
8277 .getMaxKeyValueSize () : connectionConfig .getMaxKeyValueSize ();
83- this .rpcTimeout = connectionConfig .getRpcTimeout ();
84- this .obTableClient .setRpcExecuteTimeout (rpcTimeout );
78+
79+ // create an OHTable object to do batch work
80+ this .ohTable = new OHTable (tableName , ohConnection , connectionConfig , pool );
8581 }
8682
8783 @ Override
@@ -119,38 +115,37 @@ public void mutate(List<? extends Mutation> mutations) throws IOException {
119115 }
120116
121117 long toAddSize = 0 ;
122- // check if every mutation's family is the same
123- // check if mutations are the same type
124118 for (Mutation m : mutations ) {
125- OHTable .checkFamilyViolation (m .getFamilyMap ().keySet (), true );
126- validateInsUpAndDelete (m );
127- Class <?> curType = m .getClass ();
128- // set the type of this BufferedMutator
129- if (type .get () == null ) {
130- type .compareAndSet (null , mutations .get (0 ).getClass ());
131- }
132- if (!type .get ().equals (curType )) {
133- throw new IllegalArgumentException ("Not support different type in one batch." );
134- }
119+ validateOperation (m );
135120 toAddSize += m .heapSize ();
136121 }
137122
138123 currentAsyncBufferSize .addAndGet (toAddSize );
139124 asyncWriteBuffer .addAll (mutations );
140125
141- asyncExecute (false );
126+ if (currentAsyncBufferSize .get () > writeBufferSize ) {
127+ execute (false );
128+ }
129+
142130 }
143131
144132 /**
145133 * Check whether the mutation is Put or Delete in 1.x
146134 * @param mt - mutation operation
147135 */
148- private void validateInsUpAndDelete (Mutation mt ) throws IllegalArgumentException {
136+ private void validateOperation (Mutation mt ) throws IllegalArgumentException {
137+ if (mt == null ) {
138+ throw new IllegalArgumentException ("Mutation operation cannot be null" );
139+ }
149140 if (!(mt instanceof Put ) && !(mt instanceof Delete )) {
150141 throw new IllegalArgumentException ("Only support for Put and Delete for now." );
151142 }
152143 if (mt instanceof Put ) {
144+ // family empty check is in validatePut
153145 HTable .validatePut ((Put ) mt , maxKeyValueSize );
146+ OHTable .checkFamilyViolation (mt .getFamilyMap ().keySet (), true );
147+ } else {
148+ OHTable .checkFamilyViolation (mt .getFamilyMap ().keySet (), false );
154149 }
155150 }
156151
@@ -161,91 +156,49 @@ private void validateInsUpAndDelete(Mutation mt) throws IllegalArgumentException
161156 * @param flushAll - if true, sends all the writes and wait for all of them to finish before
162157 * returning.
163158 */
164- private void asyncExecute (boolean flushAll ) throws IOException {
159+ private void execute (boolean flushAll ) throws IOException {
165160 LinkedList <Mutation > execBuffer = new LinkedList <>();
166- ObTableBatchOperationRequest request = null ;
167- // namespace n1, n1:table_name
168- // namespace default, table_name
169- String tableNameString = tableName .getNameAsString ();
161+ long dequeuedSize = 0L ;
170162 try {
171- while (true ) {
172- try {
173- if (!flushAll || asyncWriteBuffer .isEmpty ()) {
174- if (currentAsyncBufferSize .get () <= writeBufferSize ) {
175- break ;
176- }
177- }
178- Mutation m ;
179- while ((m = asyncWriteBuffer .poll ()) != null ) {
180- execBuffer .add (m );
181- long size = m .heapSize ();
182- currentAsyncBufferSize .addAndGet (-size );
183- }
184- // in concurrent situation, asyncWriteBuffer may be empty here
185- // for other threads flush all buffer
186- if (execBuffer .isEmpty ()) {
187- break ;
188- }
189- // for now, operations' family is the same
190- byte [] family = execBuffer .getFirst ().getFamilyMap ().firstKey ();
191- ObTableBatchOperation batch = buildObTableBatchOperation (execBuffer );
192- // table_name$cf_name
193- String targetTableName = OHTable .getTargetTableName (tableNameString , Bytes .toString (family ), conf );
194- request = OHTable .buildObTableBatchOperationRequest (batch , targetTableName );
195- } catch (Exception ex ) {
196- LOGGER .error ("Errors occur before mutation operation" , ex );
197- throw new IllegalArgumentException ("Errors occur before mutation operation" , ex );
198- }
199- try {
200- ObTableBatchOperationResult result = (ObTableBatchOperationResult ) obTableClient .execute (request );
201- } catch (Exception ex ) {
202- LOGGER .debug ("Errors occur during mutation operation" , ex );
203- Mutation m = null ;
204- try {
205- // retry every single operation
206- while (!execBuffer .isEmpty ()) {
207- // poll elements from execBuffer to recollect remaining operations
208- m = execBuffer .poll ();
209- byte [] family = m .getFamilyMap ().firstKey ();
210- ObTableBatchOperation batch = buildObTableBatchOperation (Collections .singletonList (m ));
211- String targetTableName = OHTable .getTargetTableName (tableNameString , Bytes .toString (family ), conf );
212- request = OHTable .buildObTableBatchOperationRequest (batch , targetTableName );
213- ObTableBatchOperationResult result = (ObTableBatchOperationResult ) obTableClient .execute (request );
214- }
215- } catch (Exception newEx ) {
216- if (m != null ) {
217- execBuffer .addFirst (m );
218- }
219- // if retry fails, only recollect remaining operations
220- while (!execBuffer .isEmpty ()) {
221- m = execBuffer .poll ();
222- long size = m .heapSize ();
223- asyncWriteBuffer .add (m );
224- currentAsyncBufferSize .addAndGet (size );
225- }
226- throw newEx ;
227- }
228- }
163+ Mutation m ;
164+ while ((writeBufferSize <= 0 || dequeuedSize < (writeBufferSize * 2 ) || flushAll )
165+ && (m = asyncWriteBuffer .poll ()) != null ) {
166+ execBuffer .add (m );
167+ long size = m .heapSize ();
168+ currentAsyncBufferSize .addAndGet (-size );
169+ dequeuedSize += size ;
170+ }
171+
172+ if (execBuffer .isEmpty ()) {
173+ return ;
229174 }
175+ ohTable .batch (execBuffer );
176+ // if commit all successfully, clean execBuffer
177+ execBuffer .clear ();
230178 } catch (Exception ex ) {
231179 LOGGER .error (LCD .convert ("01-00026" ), ex );
232- // if the cause is illegal argument, directly throw to user
233- if (ex instanceof IllegalArgumentException ) {
234- throw (IllegalArgumentException ) ex ;
235- }
236- // TODO: need to collect error information and actions during batch operations
237- // TODO: maybe keep in ObTableBatchOperationResult
238- List <Throwable > throwables = new ArrayList <Throwable >();
239- List <Row > actions = new ArrayList <Row >();
240- List <String > addresses = new ArrayList <String >();
241- throwables .add (ex );
242- RetriesExhaustedWithDetailsException error = new RetriesExhaustedWithDetailsException (
243- new ArrayList <Throwable >(throwables ),
244- new ArrayList <Row >(actions ), new ArrayList <String >(addresses ));
245- if (listener == null ) {
246- throw error ;
180+ if (ex .getCause () instanceof RetriesExhaustedWithDetailsException ) {
181+ LOGGER .error (tableName + ": One or more of the operations have failed after retries." );
182+ RetriesExhaustedWithDetailsException retryException = (RetriesExhaustedWithDetailsException ) ex .getCause ();
183+ // recollect mutations
184+ execBuffer .clear ();
185+ for (int i = 0 ; i < retryException .getNumExceptions (); ++i ) {
186+ execBuffer .add ((Mutation ) retryException .getRow (i ));
187+ }
188+ if (listener != null ) {
189+ listener .onException (retryException , this );
190+ } else {
191+ throw retryException ;
192+ }
247193 } else {
248- listener .onException (error , this );
194+ LOGGER .error ("Errors unrelated to operations occur during mutation operation" , ex );
195+ throw ex ;
196+ }
197+ } finally {
198+ for (Mutation mutation : execBuffer ) {
199+ long size = mutation .heapSize ();
200+ currentAsyncBufferSize .addAndGet (size );
201+ asyncWriteBuffer .add (mutation );
249202 }
250203 }
251204 }
@@ -256,7 +209,7 @@ public void close() throws IOException {
256209 return ;
257210 }
258211 try {
259- asyncExecute (true );
212+ execute (true );
260213 } finally {
261214 // the pool in ObTableClient will be shut down too
262215 this .pool .shutdown ();
@@ -273,27 +226,40 @@ public void close() throws IOException {
273226 }
274227 }
275228
229+ @ Deprecated
230+ public void setWriteBufferSize (long writeBufferSize ) throws IOException {
231+ this .writeBufferSize = writeBufferSize ;
232+ if (currentAsyncBufferSize .get () > writeBufferSize ) {
233+ flush ();
234+ }
235+ }
236+
276237 /**
277238 * Force to commit all operations
278239 * do not care whether the pool is shut down or this BufferedMutator is closed
279240 */
280241 @ Override
281242 public void flush () throws IOException {
282- asyncExecute (true );
243+ execute (true );
283244 }
284245
285246 @ Override
286247 public long getWriteBufferSize () {
287248 return this .writeBufferSize ;
288249 }
289250
290- private ObTableBatchOperation buildObTableBatchOperation (List <? extends Mutation > execBuffer ) {
291- List <KeyValue > keyValueList = new LinkedList <>();
292- for (Mutation mutation : execBuffer ) {
293- for (Map .Entry <byte [], List <KeyValue >> entry : mutation .getFamilyMap ().entrySet ()) {
294- keyValueList .addAll (entry .getValue ());
295- }
296- }
297- return OHTable .buildObTableBatchOperation (keyValueList , false , null );
251+ public void setRpcTimeout (int rpcTimeout ) {
252+ this .rpcTimeout = rpcTimeout ;
253+ this .ohTable .setRpcTimeout (rpcTimeout );
254+ }
255+
256+ public void setOperationTimeout (int operationTimeout ) {
257+ this .operationTimeout = operationTimeout ;
258+ this .ohTable .setOperationTimeout (operationTimeout );
259+ }
260+
261+ @ Deprecated
262+ public List <Row > getWriteBuffer () {
263+ return Arrays .asList (asyncWriteBuffer .toArray (new Row [0 ]));
298264 }
299265}
0 commit comments