1313import io .kafbat .ui .exception .NotFoundException ;
1414import io .kafbat .ui .exception .ValidationException ;
1515import io .kafbat .ui .util .KafkaVersion ;
16+ import io .kafbat .ui .util .MetadataVersion ;
1617import io .kafbat .ui .util .annotation .KafkaClientInternalsDependant ;
1718import java .io .Closeable ;
1819import java .time .Duration ;
4950import org .apache .kafka .clients .admin .DescribeClusterOptions ;
5051import org .apache .kafka .clients .admin .DescribeClusterResult ;
5152import org .apache .kafka .clients .admin .DescribeConfigsOptions ;
53+ import org .apache .kafka .clients .admin .FeatureMetadata ;
54+ import org .apache .kafka .clients .admin .FinalizedVersionRange ;
5255import org .apache .kafka .clients .admin .ListConsumerGroupOffsetsSpec ;
5356import org .apache .kafka .clients .admin .ListOffsetsResult ;
5457import org .apache .kafka .clients .admin .ListTopicsOptions ;
9699@ Slf4j
97100@ AllArgsConstructor
98101public class ReactiveAdminClient implements Closeable {
102+ private static final String DEFAULT_UNKNOWN_VERSION = "Unknown" ;
99103
100104 public enum SupportedFeature {
101105 INCREMENTAL_ALTER_CONFIGS (2.3f ),
@@ -114,8 +118,8 @@ public enum SupportedFeature {
114118 this .predicate = (admin , ver ) -> Mono .just (ver != null && ver >= fromVersion );
115119 }
116120
117- static Mono <Set <SupportedFeature >> forVersion (AdminClient ac , String kafkaVersionStr ) {
118- @ Nullable Float kafkaVersion = KafkaVersion . parse ( kafkaVersionStr ).orElse (null );
121+ static Mono <Set <SupportedFeature >> forVersion (AdminClient ac , Optional < String > kafkaVersionStr ) {
122+ @ Nullable Float kafkaVersion = kafkaVersionStr . flatMap ( KafkaVersion :: parse ).orElse (null );
119123 return Flux .fromArray (SupportedFeature .values ())
120124 .flatMap (f -> f .predicate .apply (ac , kafkaVersion ).map (enabled -> Tuples .of (f , enabled )))
121125 .filter (Tuple2 ::getT2 )
@@ -150,18 +154,28 @@ private static Mono<ConfigRelatedInfo> extract(AdminClient ac) {
150154 .orElse (desc .getNodes ().iterator ().next ().id ());
151155 return loadBrokersConfig (ac , List .of (targetNodeId ))
152156 .map (map -> map .isEmpty () ? List .<ConfigEntry >of () : map .get (targetNodeId ))
153- .flatMap (configs -> {
154- String version = "1.0-UNKNOWN" ;
157+ .zipWith (toMono (ac .describeFeatures ().featureMetadata ()))
158+ .flatMap (tuple -> {
159+ List <ConfigEntry > configs = tuple .getT1 ();
160+ FeatureMetadata featureMetadata = tuple .getT2 ();
161+ Optional <String > version = Optional .empty ();
155162 boolean topicDeletionEnabled = true ;
156163 for (ConfigEntry entry : configs ) {
157164 if (entry .name ().contains ("inter.broker.protocol.version" )) {
158- version = entry .value ();
165+ version = Optional . of ( entry .value () );
159166 }
160167 if (entry .name ().equals ("delete.topic.enable" )) {
161168 topicDeletionEnabled = Boolean .parseBoolean (entry .value ());
162169 }
163170 }
164- final String finalVersion = version ;
171+ if (version .isEmpty ()) {
172+ FinalizedVersionRange metadataVersion =
173+ featureMetadata .finalizedFeatures ().get ("metadata.version" );
174+ if (metadataVersion != null ) {
175+ version = MetadataVersion .findVersion (metadataVersion .maxVersionLevel ());
176+ }
177+ }
178+ final String finalVersion = version .orElse (DEFAULT_UNKNOWN_VERSION );
165179 final boolean finalTopicDeletionEnabled = topicDeletionEnabled ;
166180 return SupportedFeature .forVersion (ac , version )
167181 .map (features -> new ConfigRelatedInfo (finalVersion , features , finalTopicDeletionEnabled ));
0 commit comments