diff --git a/opentracing-kafka-client/pom.xml b/opentracing-kafka-client/pom.xml index 2ac937c..190dbc6 100644 --- a/opentracing-kafka-client/pom.xml +++ b/opentracing-kafka-client/pom.xml @@ -32,6 +32,11 @@ kafka-clients provided + + org.slf4j + slf4j-api + 1.7.30 + \ No newline at end of file diff --git a/opentracing-kafka-client/src/main/java/io/opentracing/contrib/kafka/TracingKafkaConsumer.java b/opentracing-kafka-client/src/main/java/io/opentracing/contrib/kafka/TracingKafkaConsumer.java index e00a363..416b9e4 100644 --- a/opentracing-kafka-client/src/main/java/io/opentracing/contrib/kafka/TracingKafkaConsumer.java +++ b/opentracing-kafka-client/src/main/java/io/opentracing/contrib/kafka/TracingKafkaConsumer.java @@ -22,6 +22,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.OptionalLong; import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.function.BiFunction; @@ -224,7 +225,7 @@ public Map committed(Set part public Map committed(Set partitions, final Duration timeout) { return consumer.committed(partitions, timeout); - } + } @Override public Map metrics() { @@ -300,6 +301,11 @@ public Map endOffsets(Collection collectio return consumer.endOffsets(collection, duration); } + @Override + public OptionalLong currentLag(final TopicPartition topicPartition) { + return consumer.currentLag(topicPartition); + } + @Override public ConsumerGroupMetadata groupMetadata() { return consumer.groupMetadata(); @@ -311,14 +317,13 @@ public void enforceRebalance() { } @Override - public void close() { - consumer.close(); + public void enforceRebalance(final String s) { + consumer.enforceRebalance(s); } @Override - @Deprecated - public void close(long l, TimeUnit timeUnit) { - consumer.close(l, timeUnit); + public void close() { + consumer.close(); } @Override diff --git a/opentracing-kafka-client/src/main/java/io/opentracing/contrib/kafka/TracingKafkaProducer.java b/opentracing-kafka-client/src/main/java/io/opentracing/contrib/kafka/TracingKafkaProducer.java index 3447d68..e9b7750 100644 --- a/opentracing-kafka-client/src/main/java/io/opentracing/contrib/kafka/TracingKafkaProducer.java +++ b/opentracing-kafka-client/src/main/java/io/opentracing/contrib/kafka/TracingKafkaProducer.java @@ -166,9 +166,4 @@ public void close(Duration duration) { producer.close(duration); } - @Override - public void close(long timeout, TimeUnit timeUnit) { - producer.close(timeout, timeUnit); - } - } diff --git a/opentracing-kafka-spring/src/test/java/io/opentracing/contrib/kafka/spring/TracingSpringKafkaTest.java b/opentracing-kafka-spring/src/test/java/io/opentracing/contrib/kafka/spring/TracingSpringKafkaTest.java index a59aabc..1ae5e8d 100644 --- a/opentracing-kafka-spring/src/test/java/io/opentracing/contrib/kafka/spring/TracingSpringKafkaTest.java +++ b/opentracing-kafka-spring/src/test/java/io/opentracing/contrib/kafka/spring/TracingSpringKafkaTest.java @@ -60,9 +60,10 @@ public void test() { await().atMost(15, TimeUnit.SECONDS).until(reportedSpansSize(), greaterThanOrEqualTo(3)); List spans = mockTracer.finishedSpans(); + spans.forEach(s -> System.out.println("Span Operation: " + s.operationName())); assertThat(spans, contains( - new SpanMatcher("To_spring"), new SpanMatcher("From_spring"), + new SpanMatcher("To_spring"), new SpanMatcher("KafkaListener_spring"))); } diff --git a/opentracing-kafka-streams/src/main/java/io/opentracing/contrib/kafka/streams/TracingKafkaClientSupplier.java b/opentracing-kafka-streams/src/main/java/io/opentracing/contrib/kafka/streams/TracingKafkaClientSupplier.java index a2c4eff..f3e4bff 100644 --- a/opentracing-kafka-streams/src/main/java/io/opentracing/contrib/kafka/streams/TracingKafkaClientSupplier.java +++ b/opentracing-kafka-streams/src/main/java/io/opentracing/contrib/kafka/streams/TracingKafkaClientSupplier.java @@ -25,6 +25,8 @@ import java.util.Collections; import java.util.Map; import java.util.function.BiFunction; + +import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; @@ -95,6 +97,11 @@ public AdminClient getAdminClient(final Map config) { return AdminClient.create(config); } + @Override + public Admin getAdmin(final Map config) { + return this.getAdminClient(config); + } + @Override public Producer getProducer(Map config) { return new TracingKafkaProducerBuilder<>( diff --git a/pom.xml b/pom.xml index c1289af..3678659 100644 --- a/pom.xml +++ b/pom.xml @@ -62,13 +62,13 @@ - 1.8 + 11 UTF-8 UTF-8 - 0.33.0 - 2.6.0 - 2.6.1 + 0.32.0 + 3.2.1 + 2.9.0 5.2.7.RELEASE 4.3.0 0.8.5 @@ -128,7 +128,7 @@ org.apache.kafka - kafka_2.11 + kafka_2.12 org.apache.kafka @@ -268,7 +268,7 @@ org.apache.maven.plugins maven-source-plugin - 3.2.0 + 3.2.1 attach-sources @@ -282,7 +282,7 @@ org.apache.maven.plugins maven-javadoc-plugin - 3.1.1 + 3.4.1 false