2828import org .apache .kafka .clients .producer .ProducerRecord ;
2929import org .apache .kafka .clients .producer .RecordMetadata ;
3030import org .apache .kafka .common .TopicPartition ;
31- import org .apache . log4j . Level ;
32- import org .apache . log4j . Logger ;
31+ import org .slf4j . Logger ;
32+ import org .slf4j . LoggerFactory ;
3333
3434import javax .servlet .annotation .WebServlet ;
3535import javax .servlet .http .HttpServlet ;
5252@ WebServlet ("/KafkaServlet" )
5353public class KafkaServlet extends HttpServlet {
5454 private static final long serialVersionUID = 1L ;
55- private static final Logger logger = Logger .getLogger (KafkaServlet .class );
55+ private static final Logger logger = LoggerFactory .getLogger (KafkaServlet .class );
5656
5757 private final String serverConfigDir = System .getProperty ("server.config.dir" );
5858 private final String resourceDir = serverConfigDir + File .separator
@@ -73,9 +73,9 @@ public class KafkaServlet extends HttpServlet {
7373 * Intialising the KafkaServlet
7474 */
7575 public void init () {
76- logger .log ( Level . WARN , "Initialising Kafka Servlet" );
77- logger .log ( Level . WARN , "Server Config directory: " + serverConfigDir );
78- logger .log ( Level . WARN , "Resource directory: " + resourceDir );
76+ logger .warn ( "Initialising Kafka Servlet" );
77+ logger .warn ( "Server Config directory: " + serverConfigDir );
78+ logger .warn ( "Resource directory: " + resourceDir );
7979
8080 // Retrieve credentials from environment
8181 EventStreamsCredentials credentials = Environment .getEventStreamsCredentials ();
@@ -93,7 +93,7 @@ public void init() {
9393 restApi .post ("/admin/topics" , "{ \" name\" : \" " + topic + "\" }" , new int []{422 });
9494
9595 String topics = restApi .get ("/admin/topics" , false );
96- logger .log ( Level . WARN , "Topics: " + topics );
96+ logger .warn ( "Topics: " + topics );
9797
9898 // Initialize Kafka Producer
9999 kafkaProducer = new KafkaProducer <>(getClientConfiguration (bootstrapServers , credentials .getApiKey (), true ));
@@ -172,12 +172,12 @@ private Properties getClientConfiguration(String broker, String apikey, boolean
172172 }
173173
174174 try {
175- logger .log ( Level . WARN , "Reading properties file from: " + fileName );
175+ logger .warn ( "Reading properties file from: " + fileName );
176176 propsStream = new FileInputStream (fileName );
177177 props .load (propsStream );
178178 propsStream .close ();
179179 } catch (IOException e ) {
180- logger .log ( Level . ERROR , e );
180+ logger .error ( e . getMessage (), e );
181181 return props ;
182182 }
183183
@@ -188,7 +188,7 @@ private Properties getClientConfiguration(String broker, String apikey, boolean
188188 saslJaasConfig = saslJaasConfig .replace ("APIKEY" , apikey );
189189 props .setProperty ("sasl.jaas.config" , saslJaasConfig );
190190
191- logger .log ( Level . WARN , "Using properties: " + props );
191+ logger .warn ( "Using properties: " + props );
192192
193193 return props ;
194194 }
@@ -199,7 +199,7 @@ private Properties getClientConfiguration(String broker, String apikey, boolean
199199 * @param topic
200200 */
201201 private void produce (String topic ) {
202- logger .log ( Level . WARN , "Producer is starting." );
202+ logger .warn ( "Producer is starting." );
203203
204204 String key = "key" ;
205205 // Push a message into the list to be sent.
@@ -217,7 +217,7 @@ private void produce(String topic) {
217217 RecordMetadata m = kafkaProducer .send (record ).get ();
218218 producedMessages ++;
219219
220- logger .log ( Level . WARN , "Message produced, offset: " + m .offset ());
220+ logger .warn ( "Message produced, offset: " + m .offset ());
221221
222222 Thread .sleep (1000 );
223223 } catch (final Exception e ) {
@@ -226,7 +226,7 @@ private void produce(String topic) {
226226 System .exit (-1 );
227227 }
228228 messageProduced = true ;
229- logger .log ( Level . WARN , "Producer is shutting down." );
229+ logger .warn ( "Producer is shutting down." );
230230 }
231231
232232 /**
@@ -250,19 +250,19 @@ public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
250250 @ Override
251251 public void onPartitionsAssigned (Collection <org .apache .kafka .common .TopicPartition > partitions ) {
252252 try {
253- logger .log ( Level . WARN , "Partitions " + partitions + " assigned, consumer seeking to end." );
253+ logger .warn ( "Partitions " + partitions + " assigned, consumer seeking to end." );
254254
255255 for (TopicPartition partition : partitions ) {
256256 long position = kafkaConsumer .position (partition );
257- logger .log ( Level . WARN , "current Position: " + position );
257+ logger .warn ( "current Position: " + position );
258258
259- logger .log ( Level . WARN , "Seeking to end..." );
259+ logger .warn ( "Seeking to end..." );
260260 kafkaConsumer .seekToEnd (Arrays .asList (partition ));
261- logger .log ( Level . WARN ,
261+ logger .warn (
262262 "Seek from the current position: " + kafkaConsumer .position (partition ));
263263 kafkaConsumer .seek (partition , position );
264264 }
265- logger .log ( Level . WARN , "Producer can now begin producing messages." );
265+ logger .warn ( "Producer can now begin producing messages." );
266266 } catch (final Exception e ) {
267267 logger .error ("Error when assigning partitions" , e );
268268 }
@@ -273,7 +273,7 @@ public void onPartitionsAssigned(Collection<org.apache.kafka.common.TopicPartiti
273273
274274 @ Override
275275 public void run () {
276- logger .log ( Level . WARN , "Consumer is starting." );
276+ logger .warn ( "Consumer is starting." );
277277
278278 while (!closing ) {
279279 try {
@@ -291,15 +291,15 @@ public void run() {
291291
292292 Thread .sleep (1000 );
293293 } catch (final InterruptedException e ) {
294- logger .log ( Level . ERROR , "Producer/Consumer loop has been unexpectedly interrupted" );
294+ logger .error ( "Producer/Consumer loop has been unexpectedly interrupted" );
295295 shutdown ();
296296 } catch (final Exception e ) {
297- logger .log ( Level . ERROR , "Consumer has failed with exception: " + e );
297+ logger .error ( "Consumer has failed with exception: " + e );
298298 shutdown ();
299299 }
300300 }
301301
302- logger .log ( Level . WARN , "Consumer is shutting down." );
302+ logger .warn ( "Consumer is shutting down." );
303303 kafkaConsumer .close ();
304304 consumedMessages .clear ();
305305 }
0 commit comments