Skip to content

Commit 34e2ed2

Browse files
authored
Merge pull request #1303 from confluentinc/sranade/fix-rest-build
Fix rest build due to upstream changes in ce-kafka
2 parents b7b1bba + 703ea0d commit 34e2ed2

File tree

1 file changed

+4
-2
lines changed

1 file changed

+4
-2
lines changed

kafka-rest/src/main/java/io/confluent/kafkarest/v2/KafkaConsumerState.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import java.time.Instant;
3737
import java.util.ArrayDeque;
3838
import java.util.Collection;
39+
import java.util.Collections;
3940
import java.util.HashMap;
4041
import java.util.List;
4142
import java.util.Map;
@@ -269,7 +270,8 @@ public synchronized ConsumerCommittedResponse committed(ConsumerCommittedRequest
269270
if (consumer != null) {
270271
for (io.confluent.kafkarest.entities.v2.TopicPartition t : request.getPartitions()) {
271272
TopicPartition partition = new TopicPartition(t.getTopic(), t.getPartition());
272-
OffsetAndMetadata offsetMetadata = consumer.committed(partition);
273+
OffsetAndMetadata offsetMetadata =
274+
consumer.committed(Collections.singleton(partition)).get(partition);
273275
if (offsetMetadata != null) {
274276
offsets.add(
275277
new TopicPartitionOffsetMetadata(
@@ -381,7 +383,7 @@ synchronized ConsumerRecord<KafkaKeyT, KafkaValueT> next() {
381383
* be invoked with the lock held, i.e. after startRead().
382384
*/
383385
private synchronized void getOrCreateConsumerRecords() {
384-
ConsumerRecords<KafkaKeyT, KafkaValueT> polledRecords = consumer.poll(0);
386+
ConsumerRecords<KafkaKeyT, KafkaValueT> polledRecords = consumer.poll(Duration.ofSeconds(0L));
385387
// drain the iterator and buffer to list
386388
for (ConsumerRecord<KafkaKeyT, KafkaValueT> consumerRecord : polledRecords) {
387389
consumerRecords.add(consumerRecord);

0 commit comments

Comments
 (0)