|
25 | 25 | package org.sourcelab.kafka.webview.ui.configuration; |
26 | 26 |
|
27 | 27 | import com.fasterxml.jackson.databind.ObjectMapper; |
| 28 | +import com.google.common.util.concurrent.ThreadFactoryBuilder; |
28 | 29 | import com.hubspot.jackson.datatype.protobuf.ProtobufModule; |
29 | 30 | import org.apache.kafka.common.serialization.Deserializer; |
| 31 | +import org.slf4j.Logger; |
| 32 | +import org.slf4j.LoggerFactory; |
30 | 33 | import org.sourcelab.kafka.webview.ui.manager.encryption.SecretManager; |
31 | 34 | import org.sourcelab.kafka.webview.ui.manager.kafka.KafkaAdminFactory; |
32 | 35 | import org.sourcelab.kafka.webview.ui.manager.kafka.KafkaClientConfigUtil; |
|
42 | 45 | import org.springframework.context.annotation.Bean; |
43 | 46 | import org.springframework.stereotype.Component; |
44 | 47 |
|
| 48 | +import java.util.concurrent.ExecutorService; |
| 49 | +import java.util.concurrent.Executors; |
| 50 | + |
45 | 51 | /** |
46 | 52 | * Application Configuration for Plugin beans. |
47 | 53 | */ |
48 | 54 | @Component |
49 | 55 | public class PluginConfig { |
| 56 | + private static final Logger logger = LoggerFactory.getLogger(PluginConfig.class); |
50 | 57 |
|
51 | 58 | /** |
52 | 59 | * Upload manager, for handling uploads of Plugins and Keystores. |
@@ -97,11 +104,30 @@ public SecretManager getSecretManager(final AppProperties appProperties) { |
97 | 104 | */ |
98 | 105 | @Bean |
99 | 106 | public WebKafkaConsumerFactory getWebKafkaConsumerFactory(final AppProperties appProperties, final KafkaClientConfigUtil configUtil) { |
| 107 | + final ExecutorService executorService; |
| 108 | + |
| 109 | + // If we have multi-threaded consumer option enabled |
| 110 | + if (appProperties.isEnableMultiThreadedConsumer()) { |
| 111 | + logger.info("Enabled multi-threaded webconsumer with {} threads.", appProperties.getMaxConcurrentWebConsumers()); |
| 112 | + |
| 113 | + // Create fixed thread pool |
| 114 | + executorService = Executors.newFixedThreadPool( |
| 115 | + appProperties.getMaxConcurrentWebConsumers(), |
| 116 | + new ThreadFactoryBuilder() |
| 117 | + .setNameFormat("kafka-web-consumer-pool-%d") |
| 118 | + .build() |
| 119 | + ); |
| 120 | + } else { |
| 121 | + // Null reference. |
| 122 | + executorService = null; |
| 123 | + } |
| 124 | + |
100 | 125 | return new WebKafkaConsumerFactory( |
101 | 126 | getDeserializerPluginFactory(appProperties), |
102 | 127 | getRecordFilterPluginFactory(appProperties), |
103 | 128 | getSecretManager(appProperties), |
104 | | - getKafkaConsumerFactory(configUtil) |
| 129 | + getKafkaConsumerFactory(configUtil), |
| 130 | + executorService |
105 | 131 | ); |
106 | 132 | } |
107 | 133 |
|
|
0 commit comments