Skip to content

Commit 97d7b3c

Browse files
mimaisonedoardocomar
authored andcommitted
Update kafka-java-console-sample (#32)
* Update kafka-java-console-sample - refreshed dependencies - use slf4j Loggers - small cleanups
1 parent 3163dfa commit 97d7b3c

File tree

6 files changed

+89
-91
lines changed

6 files changed

+89
-91
lines changed

kafka-java-console-sample/build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ repositories {
3636
}
3737

3838
dependencies {
39-
compile 'org.apache.kafka:kafka-clients:2.2.+'
39+
compile 'org.apache.kafka:kafka-clients:2.4.+'
4040
compile 'log4j:log4j:1.2.17'
4141
compile 'org.slf4j:slf4j-log4j12:1.7.25'
4242
compile 'com.fasterxml.jackson.core:jackson-databind:2.9.7'

kafka-java-console-sample/kafka-java-console-sample.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,11 @@ metadata:
66
spec:
77
containers:
88
- image: ibmcom/kafka-java-console-sample:latest
9-
imagePullPolicy: IfNotPresent
9+
imagePullPolicy: Always
1010
name: java-sample
1111
env:
1212
- name: VCAP_SERVICES
1313
valueFrom:
1414
secretKeyRef:
1515
name: eventstreams-binding
16-
key: binding
16+
key: binding

kafka-java-console-sample/src/main/java/com/eventstreams/samples/ConsumerRunnable.java

Lines changed: 20 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -19,46 +19,46 @@
1919
*/
2020
package com.eventstreams.samples;
2121

22+
import java.time.Duration;
2223
import java.util.Arrays;
2324
import java.util.List;
24-
import java.util.Properties;
25-
import java.time.Duration;
25+
import java.util.Map;
2626

2727
import org.apache.kafka.clients.consumer.ConsumerRecord;
2828
import org.apache.kafka.clients.consumer.ConsumerRecords;
2929
import org.apache.kafka.clients.consumer.KafkaConsumer;
3030
import org.apache.kafka.common.KafkaException;
3131
import org.apache.kafka.common.PartitionInfo;
3232
import org.apache.kafka.common.errors.WakeupException;
33-
import org.apache.log4j.Level;
34-
import org.apache.log4j.Logger;
33+
import org.slf4j.Logger;
34+
import org.slf4j.LoggerFactory;
3535

3636
public class ConsumerRunnable implements Runnable {
37-
private static final Logger logger = Logger.getLogger(ConsumerRunnable.class);
37+
private static final Logger logger = LoggerFactory.getLogger(ConsumerRunnable.class);
3838

3939
private final KafkaConsumer<String, String> kafkaConsumer;
4040
private volatile boolean closing = false;
4141

42-
public ConsumerRunnable(Properties consumerProperties, String topic) {
42+
public ConsumerRunnable(Map<String, Object> consumerConfigs, String topic) {
4343
// Create a Kafka consumer with the provided client configuration
44-
kafkaConsumer = new KafkaConsumer<String, String>(consumerProperties);
44+
kafkaConsumer = new KafkaConsumer<String, String>(consumerConfigs);
4545

4646
// Checking for topic existence before subscribing
4747
List<PartitionInfo> partitions = kafkaConsumer.partitionsFor(topic);
4848
if (partitions == null || partitions.isEmpty()) {
49-
logger.log(Level.ERROR, "Topic '" + topic + "' does not exists - application will terminate");
50-
kafkaConsumer.close();
49+
logger.error("Topic '{}' does not exists - application will terminate", topic);
50+
kafkaConsumer.close(Duration.ofSeconds(5L));
5151
throw new IllegalStateException("Topic '" + topic + "' does not exists - application will terminate");
5252
} else {
53-
logger.log(Level.INFO, partitions.toString());
53+
logger.info(partitions.toString());
5454
}
55-
55+
5656
kafkaConsumer.subscribe(Arrays.asList(topic));
5757
}
5858

5959
@Override
6060
public void run() {
61-
logger.log(Level.INFO, ConsumerRunnable.class.toString() + " is starting.");
61+
logger.info("{} is starting.", ConsumerRunnable.class);
6262

6363
try {
6464
while (!closing) {
@@ -67,34 +67,34 @@ public void run() {
6767
ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(3000L));
6868

6969
if (records.isEmpty()) {
70-
logger.log(Level.INFO, "No messages consumed");
70+
logger.info("No messages consumed");
7171
} else {
7272
// Iterate through all the messages received and print their content
7373
for (ConsumerRecord<String, String> record : records) {
74-
logger.log(Level.INFO, "Message consumed: " + record.toString());
74+
logger.info("Message consumed: {}", record);
7575
}
7676
}
7777

7878
} catch (final WakeupException e) {
79-
logger.log(Level.WARN, "Consumer closing - caught exception: " + e);
79+
logger.warn("Consumer closing - caught exception: {}", e);
8080
} catch (final KafkaException e) {
81-
logger.log(Level.ERROR, "Sleeping for 5s - Consumer has caught: " + e, e);
81+
logger.error("Sleeping for 5s - Consumer has caught: {}", e, e);
8282
try {
8383
Thread.sleep(5000); // Longer sleep before retrying
8484
} catch (InterruptedException e1) {
85-
logger.log(Level.WARN, "Consumer closing - caught exception: " + e);
85+
logger.warn("Consumer closing - caught exception: {}", e);
8686
}
8787
}
8888
}
8989
} finally {
90-
kafkaConsumer.close();
91-
logger.log(Level.INFO, ConsumerRunnable.class.toString() + " has shut down.");
90+
kafkaConsumer.close(Duration.ofSeconds(5L));
91+
logger.info("{} has shut down.", ConsumerRunnable.class);
9292
}
9393
}
9494

9595
public void shutdown() {
9696
closing = true;
9797
kafkaConsumer.wakeup();
98-
logger.log(Level.INFO, ConsumerRunnable.class.toString() + " is shutting down.");
98+
logger.info("{} is shutting down.", ConsumerRunnable.class);
9999
}
100100
}

kafka-java-console-sample/src/main/java/com/eventstreams/samples/EventStreamsConsoleSample.java

Lines changed: 34 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.lang.Thread.UncaughtExceptionHandler;
2323
import java.util.Arrays;
2424
import java.util.Collections;
25+
import java.util.HashMap;
2526
import java.util.Map;
2627
import java.util.Properties;
2728
import java.util.concurrent.ExecutionException;
@@ -37,8 +38,10 @@
3738
import org.apache.kafka.common.config.SaslConfigs;
3839
import org.apache.kafka.common.config.SslConfigs;
3940
import org.apache.kafka.common.errors.TopicExistsException;
40-
import org.apache.log4j.Level;
41-
import org.apache.log4j.Logger;
41+
import org.apache.kafka.common.serialization.StringDeserializer;
42+
import org.apache.kafka.common.serialization.StringSerializer;
43+
import org.slf4j.Logger;
44+
import org.slf4j.LoggerFactory;
4245

4346
import com.eventstreams.samples.env.Environment;
4447
import com.eventstreams.samples.env.EventStreamsCredentials;
@@ -55,7 +58,7 @@ public class EventStreamsConsoleSample {
5558
private static final String ARG_CONSUMER = "-consumer";
5659
private static final String ARG_PRODUCER_ = "-producer";
5760
private static final String ARG_TOPIC = "-topic";
58-
private static final Logger logger = Logger.getLogger(EventStreamsConsoleSample.class);
61+
private static final Logger logger = LoggerFactory.getLogger(EventStreamsConsoleSample.class);
5962

6063
private static Thread consumerThread = null;
6164
private static ConsumerRunnable consumerRunnable = null;
@@ -67,14 +70,14 @@ public class EventStreamsConsoleSample {
6770
Runtime.getRuntime().addShutdownHook(new Thread() {
6871
@Override
6972
public void run() {
70-
logger.log(Level.WARN, "Shutdown received.");
73+
logger.warn("Shutdown received.");
7174
shutdown();
7275
}
7376
});
7477
Thread.setDefaultUncaughtExceptionHandler(new UncaughtExceptionHandler() {
7578
@Override
7679
public void uncaughtException(Thread t, Throwable e) {
77-
logger.log(Level.ERROR, "Uncaught Exception on " + t.getName() + " : " + e, e);
80+
logger.error("Uncaught Exception on {} : {}", t.getName(), e, e);
7881
shutdown();
7982
}
8083
});
@@ -117,24 +120,19 @@ public static void main(String args[]) {
117120
}
118121
// Check environment: VCAP_SERVICES vs command line arguments, to obtain configuration parameters
119122
if (args.length == 0) {
120-
121-
logger.log(Level.INFO, "Using VCAP_SERVICES to find credentials.");
122-
123+
logger.info("Using VCAP_SERVICES to find credentials.");
123124
EventStreamsCredentials credentials = Environment.getEventStreamsCredentials();
124-
125125
bootstrapServers = stringArrayToCSV(credentials.getKafkaBrokersSasl());
126126
apiKey = credentials.getApiKey();
127-
128127
} else {
129128
// If running locally, parse the command line
130129
if (args.length < 2) {
131-
logger.log(Level.ERROR, "It appears the application is running without VCAP_SERVICES but the arguments are incorrect for local mode.");
130+
logger.error("It appears the application is running without VCAP_SERVICES but the arguments are incorrect for local mode.");
132131
printUsage();
133132
System.exit(-1);
134133
}
135134

136-
logger.log(Level.INFO, "Using command line arguments to find credentials.");
137-
135+
logger.info("Using command line arguments to find credentials.");
138136
bootstrapServers = args[0];
139137
apiKey = args[1];
140138
if (apiKey.contains(":")) {
@@ -162,50 +160,50 @@ public static void main(String args[]) {
162160
topicName = parsedArgs.get(ARG_TOPIC);
163161
}
164162
} catch (IllegalArgumentException e) {
165-
logger.log(Level.ERROR, e.getMessage());
163+
logger.error(e.getMessage());
166164
System.exit(-1);
167165
}
168166
}
169167
}
170168

171-
logger.log(Level.INFO, "Kafka Endpoints: " + bootstrapServers);
169+
logger.info("Kafka Endpoints: {}", bootstrapServers);
172170

173171
//Using Kafka Admin API to create topic
174172
try (AdminClient admin = AdminClient.create(getAdminConfigs(bootstrapServers, apiKey))) {
175-
logger.log(Level.INFO, "Creating the topic " + topicName);
173+
logger.info("Creating the topic {}", topicName);
176174
NewTopic newTopic = new NewTopic(topicName, 1, (short) 3);
177175
CreateTopicsResult ctr = admin.createTopics(Collections.singleton(newTopic));
178176
ctr.all().get(10, TimeUnit.SECONDS);
179177
} catch (ExecutionException ee) {
180178
if (ee.getCause() instanceof TopicExistsException) {
181-
logger.log(Level.INFO, "Topic " + topicName + " already exists");
179+
logger.info("Topic {} already exists", topicName);
182180
} else {
183-
logger.log(Level.ERROR, "Error occurred creating the topic " + topicName, ee);
181+
logger.error("Error occurred creating the topic " + topicName, ee);
184182
System.exit(-1);
185183
}
186184
} catch (Exception e) {
187-
logger.log(Level.ERROR, "Error occurred creating the topic " + topicName, e);
185+
logger.error("Error occurred creating the topic {}", topicName, e);
188186
System.exit(-1);
189187
}
190188

191189
//create the Kafka clients
192190
if (runConsumer) {
193-
Properties consumerProperties = getConsumerConfigs(bootstrapServers, apiKey);
194-
consumerRunnable = new ConsumerRunnable(consumerProperties, topicName);
191+
Map<String, Object> consumerConfigs = getConsumerConfigs(bootstrapServers, apiKey);
192+
consumerRunnable = new ConsumerRunnable(consumerConfigs, topicName);
195193
consumerThread = new Thread(consumerRunnable, "Consumer Thread");
196194
consumerThread.start();
197195
}
198196

199197
if (runProducer) {
200-
Properties producerProperties = getProducerConfigs(bootstrapServers, apiKey);
201-
producerRunnable = new ProducerRunnable(producerProperties, topicName);
198+
Map<String, Object> producerConfigs = getProducerConfigs(bootstrapServers, apiKey);
199+
producerRunnable = new ProducerRunnable(producerConfigs, topicName);
202200
producerThread = new Thread(producerRunnable, "Producer Thread");
203201
producerThread.start();
204202
}
205203

206-
logger.log(Level.INFO, "EventStreamsConsoleSample will run until interrupted.");
204+
logger.info("EventStreamsConsoleSample will run until interrupted.");
207205
} catch (Exception e) {
208-
logger.log(Level.ERROR, "Exception occurred, application will terminate", e);
206+
logger.error("Exception occurred, application will terminate", e);
209207
System.exit(-1);
210208
}
211209
}
@@ -236,21 +234,21 @@ private static String stringArrayToCSV(String[] sArray) {
236234
return sb.toString();
237235
}
238236

239-
static final Properties getProducerConfigs(String bootstrapServers, String apikey) {
240-
Properties configs = new Properties();
241-
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
242-
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
237+
static final Map<String, Object> getProducerConfigs(String bootstrapServers, String apikey) {
238+
Map<String, Object> configs = new HashMap<>();
239+
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
240+
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
243241
configs.put(ProducerConfig.CLIENT_ID_CONFIG, "kafka-java-console-sample-producer");
244-
configs.put(ProducerConfig.ACKS_CONFIG, "-1");
242+
configs.put(ProducerConfig.ACKS_CONFIG, "all");
245243
configs.put(ProducerConfig.CLIENT_DNS_LOOKUP_CONFIG,"use_all_dns_ips");
246244
configs.putAll(getCommonConfigs(bootstrapServers, apikey));
247245
return configs;
248246
}
249247

250-
static final Properties getConsumerConfigs(String bootstrapServers, String apikey) {
251-
Properties configs = new Properties();
252-
configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
253-
configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
248+
static final Map<String, Object> getConsumerConfigs(String bootstrapServers, String apikey) {
249+
Map<String, Object> configs = new HashMap<>();
250+
configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
251+
configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
254252
configs.put(ConsumerConfig.CLIENT_ID_CONFIG, "kafka-java-console-sample-consumer");
255253
configs.put(ConsumerConfig.GROUP_ID_CONFIG, "kafka-java-console-sample-group");
256254
configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
@@ -259,8 +257,8 @@ static final Properties getConsumerConfigs(String bootstrapServers, String apike
259257
return configs;
260258
}
261259

262-
static final Properties getCommonConfigs(String boostrapServers, String apikey) {
263-
Properties configs = new Properties();
260+
static final Map<String, Object> getCommonConfigs(String boostrapServers, String apikey) {
261+
Map<String, Object> configs = new HashMap<>();
264262
configs.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, boostrapServers);
265263
configs.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
266264
configs.put(SaslConfigs.SASL_MECHANISM, "PLAIN");

0 commit comments

Comments
 (0)