From 449cbe7569b92db2b6559e0e8f10523d9e3b353f Mon Sep 17 00:00:00 2001 From: Brian Ashburn Date: Mon, 29 Aug 2022 22:06:26 -0700 Subject: [PATCH] Update to version 3.0 client and update interfaces --- opentracing-kafka-client/pom.xml | 5 +++ .../contrib/kafka/TracingKafkaConsumer.java | 36 ++++++++++++++++--- .../contrib/kafka/TracingKafkaProducer.java | 14 +++++--- .../streams/TracingKafkaClientSupplier.java | 7 ++++ pom.xml | 12 +++---- 5 files changed, 58 insertions(+), 16 deletions(-) diff --git a/opentracing-kafka-client/pom.xml b/opentracing-kafka-client/pom.xml index a8001c8..dcd094e 100644 --- a/opentracing-kafka-client/pom.xml +++ b/opentracing-kafka-client/pom.xml @@ -31,6 +31,11 @@ org.apache.kafka kafka-clients + + org.slf4j + slf4j-api + 1.7.30 + \ No newline at end of file diff --git a/opentracing-kafka-client/src/main/java/io/opentracing/contrib/kafka/TracingKafkaConsumer.java b/opentracing-kafka-client/src/main/java/io/opentracing/contrib/kafka/TracingKafkaConsumer.java index 04e0b75..2e974b2 100644 --- a/opentracing-kafka-client/src/main/java/io/opentracing/contrib/kafka/TracingKafkaConsumer.java +++ b/opentracing-kafka-client/src/main/java/io/opentracing/contrib/kafka/TracingKafkaConsumer.java @@ -20,11 +20,13 @@ import java.util.Collection; import java.util.List; import java.util.Map; +import java.util.OptionalLong; import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.function.BiFunction; import java.util.regex.Pattern; import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerGroupMetadata; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; @@ -212,6 +214,16 @@ public OffsetAndMetadata committed(TopicPartition topicPartition, Duration durat return consumer.committed(topicPartition, duration); } + @Override + public Map committed(final Set set) { + return consumer.committed(set); + } + + @Override + public Map committed(final Set set, final Duration duration) { + return consumer.committed(set, duration); + } + @Override public Map metrics() { return consumer.metrics(); @@ -287,14 +299,28 @@ public Map endOffsets(Collection collectio } @Override - public void close() { - consumer.close(); + public OptionalLong currentLag(final TopicPartition topicPartition) { + return consumer.currentLag(topicPartition); } @Override - @Deprecated - public void close(long l, TimeUnit timeUnit) { - consumer.close(l, timeUnit); + public ConsumerGroupMetadata groupMetadata() { + return consumer.groupMetadata(); + } + + @Override + public void enforceRebalance() { + consumer.enforceRebalance(); + } + + @Override + public void enforceRebalance(final String s) { + consumer.enforceRebalance(s); + } + + @Override + public void close() { + consumer.close(); } @Override diff --git a/opentracing-kafka-client/src/main/java/io/opentracing/contrib/kafka/TracingKafkaProducer.java b/opentracing-kafka-client/src/main/java/io/opentracing/contrib/kafka/TracingKafkaProducer.java index 5ea3745..4667e06 100644 --- a/opentracing-kafka-client/src/main/java/io/opentracing/contrib/kafka/TracingKafkaProducer.java +++ b/opentracing-kafka-client/src/main/java/io/opentracing/contrib/kafka/TracingKafkaProducer.java @@ -24,6 +24,8 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.function.BiFunction; + +import org.apache.kafka.clients.consumer.ConsumerGroupMetadata; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.Producer; @@ -87,6 +89,13 @@ public void sendOffsetsToTransaction(Map map, producer.sendOffsetsToTransaction(map, s); } + @Override + public void sendOffsetsToTransaction(final Map map, + final ConsumerGroupMetadata consumerGroupMetadata + ) throws ProducerFencedException { + producer.sendOffsetsToTransaction(map, consumerGroupMetadata); + } + @Override public void commitTransaction() throws ProducerFencedException { producer.commitTransaction(); @@ -146,9 +155,4 @@ public void close(Duration duration) { producer.close(duration); } - @Override - public void close(long timeout, TimeUnit timeUnit) { - producer.close(timeout, timeUnit); - } - } diff --git a/opentracing-kafka-streams/src/main/java/io/opentracing/contrib/kafka/streams/TracingKafkaClientSupplier.java b/opentracing-kafka-streams/src/main/java/io/opentracing/contrib/kafka/streams/TracingKafkaClientSupplier.java index a3cc3e0..fbb3562 100644 --- a/opentracing-kafka-streams/src/main/java/io/opentracing/contrib/kafka/streams/TracingKafkaClientSupplier.java +++ b/opentracing-kafka-streams/src/main/java/io/opentracing/contrib/kafka/streams/TracingKafkaClientSupplier.java @@ -20,6 +20,8 @@ import io.opentracing.util.GlobalTracer; import java.util.Map; import java.util.function.BiFunction; + +import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; @@ -77,6 +79,11 @@ public AdminClient getAdminClient(final Map config) { return AdminClient.create(config); } + @Override + public Admin getAdmin(final Map config) { + return this.getAdminClient(config); + } + @Override public Producer getProducer(Map config) { return new TracingKafkaProducer<>( diff --git a/pom.xml b/pom.xml index 5268d07..95cda68 100644 --- a/pom.xml +++ b/pom.xml @@ -62,13 +62,13 @@ - 1.8 + 11 UTF-8 UTF-8 0.32.0 - 2.2.0 - 2.2.4.RELEASE + 3.2.1 + 2.9.0 4.3.0 0.8.3 @@ -127,21 +127,21 @@ org.apache.kafka - kafka_2.11 + kafka_2.12 org.apache.kafka - kafka_2.12 + kafka_2.13 ${kafka.version} test org.apache.kafka - kafka_2.12 + kafka_2.13 ${kafka.version} test test