4141import org .apache .kafka .clients .admin .TopicListing ;
4242import org .apache .kafka .clients .consumer .KafkaConsumer ;
4343import org .apache .kafka .clients .consumer .OffsetAndMetadata ;
44+ import org .apache .kafka .common .KafkaException ;
4445import org .apache .kafka .common .Node ;
4546import org .apache .kafka .common .TopicPartitionInfo ;
4647import org .apache .kafka .common .config .ConfigResource ;
@@ -114,7 +115,9 @@ public TopicList getAvailableTopics() {
114115 );
115116 }
116117 return new TopicList (topicListings );
117- } catch (InterruptedException | ExecutionException e ) {
118+ } catch (final ExecutionException e ) {
119+ throw handleExecutionException (e );
120+ } catch (final InterruptedException e ) {
118121 // TODO Handle
119122 throw new RuntimeException (e .getMessage (), e );
120123 }
@@ -128,12 +131,13 @@ public NodeList getClusterNodes() {
128131
129132 try {
130133 final Collection <Node > nodes = adminClient .describeCluster ().nodes ().get ();
131- for (final Node node : nodes ) {
134+ for (final Node node : nodes ) {
132135 nodeDetails .add (new NodeDetails (node .id (), node .host (), node .port (), node .rack ()));
133136 }
134137 return new NodeList (nodeDetails );
135- } catch (InterruptedException | ExecutionException e ) {
136- // TODO Handle
138+ } catch (final ExecutionException e ) {
139+ throw handleExecutionException (e );
140+ } catch (final InterruptedException e ) {
137141 throw new RuntimeException (e .getMessage (), e );
138142 }
139143 }
@@ -201,7 +205,9 @@ public Map<String, TopicDetails> getTopicDetails(final Collection<String> topics
201205 }
202206 // Return it
203207 return results ;
204- } catch (final InterruptedException | ExecutionException exception ) {
208+ } catch (final ExecutionException e ) {
209+ throw handleExecutionException (e );
210+ } catch (final InterruptedException exception ) {
205211 // TODO Handle this
206212 throw new RuntimeException (exception .getMessage (), exception );
207213 }
@@ -252,7 +258,9 @@ public boolean createTopic(final CreateTopic createTopic) {
252258
253259 // return true?
254260 return true ;
255- } catch (final InterruptedException | ExecutionException exception ) {
261+ } catch (final ExecutionException e ) {
262+ throw handleExecutionException (e );
263+ } catch (final InterruptedException exception ) {
256264 // TODO Handle this
257265 throw new RuntimeException (exception .getMessage (), exception );
258266 }
@@ -287,7 +295,9 @@ public TopicConfig alterTopicConfig(final String topic, final Map<String, String
287295
288296 // Lets return updated topic details
289297 return getTopicConfig (topic );
290- } catch (final InterruptedException | ExecutionException exception ) {
298+ } catch (final ExecutionException e ) {
299+ throw handleExecutionException (e );
300+ } catch (final InterruptedException exception ) {
291301 // TODO Handle this
292302 throw new RuntimeException (exception .getMessage (), exception );
293303 }
@@ -307,7 +317,9 @@ public boolean removeTopic(final String topic) {
307317
308318 // return true?
309319 return true ;
310- } catch (final InterruptedException | ExecutionException exception ) {
320+ } catch (final ExecutionException e ) {
321+ throw handleExecutionException (e );
322+ } catch (final InterruptedException exception ) {
311323 // TODO Handle this
312324 throw new RuntimeException (exception .getMessage (), exception );
313325 }
@@ -340,8 +352,9 @@ public List<ConsumerGroupIdentifier> listConsumers() {
340352
341353 // return immutable list.
342354 return Collections .unmodifiableList (consumerIds );
343-
344- } catch (final InterruptedException | ExecutionException e ) {
355+ } catch (final ExecutionException e ) {
356+ throw handleExecutionException (e );
357+ } catch (final InterruptedException e ) {
345358 throw new RuntimeException (e .getMessage (), e );
346359 }
347360 }
@@ -358,7 +371,9 @@ public boolean removeConsumerGroup(final String id) {
358371 try {
359372 request .all ().get ();
360373 return true ;
361- } catch (InterruptedException | ExecutionException e ) {
374+ } catch (final ExecutionException e ) {
375+ throw handleExecutionException (e );
376+ } catch (InterruptedException e ) {
362377 // TODO Handle this
363378 throw new RuntimeException (e .getMessage (), e );
364379 }
@@ -444,7 +459,9 @@ public List<ConsumerGroupDetails> getConsumerGroupDetails(final List<String> con
444459
445460 // Return immutable list.
446461 return Collections .unmodifiableList (consumerGroupDetails );
447- } catch (final InterruptedException | ExecutionException e ) {
462+ } catch (final ExecutionException e ) {
463+ throw handleExecutionException (e );
464+ } catch (final InterruptedException e ) {
448465 throw new RuntimeException (e .getMessage (), e );
449466 }
450467 }
@@ -476,7 +493,9 @@ public ConsumerGroupOffsets getConsumerGroupOffsets(final String consumerGroupId
476493 }
477494
478495 return builder .build ();
479- } catch (final InterruptedException | ExecutionException e ) {
496+ } catch (final ExecutionException e ) {
497+ throw handleExecutionException (e );
498+ } catch (final InterruptedException e ) {
480499 throw new RuntimeException (e .getMessage (), e );
481500 }
482501 }
@@ -573,12 +592,21 @@ private List<ConfigItem> describeResource(final ConfigResource configResource) {
573592 );
574593 }
575594 return configItems ;
576- } catch (InterruptedException | ExecutionException e ) {
595+ } catch (final ExecutionException e ) {
596+ throw handleExecutionException (e );
597+ } catch (InterruptedException e ) {
577598 // TODO Handle this
578599 throw new RuntimeException (e .getMessage (), e );
579600 }
580601 }
581602
603+ private RuntimeException handleExecutionException (final ExecutionException e ) {
604+ if (e .getCause () != null && e .getCause () instanceof RuntimeException ) {
605+ return (RuntimeException ) e .getCause ();
606+ }
607+ return new RuntimeException (e .getMessage (), e );
608+ }
609+
582610 /**
583611 * Close out the Client.
584612 */
0 commit comments