@@ -75,7 +75,7 @@ public class ClientConfig {
7575 /**
7676 * How long to timeout poll requests.
7777 */
78- private final long pollTimeoutMs = 2000 ;
78+ private final long pollTimeoutMs ;
7979
8080 /**
8181 * Constructor.
@@ -89,7 +89,7 @@ private ClientConfig(
8989 final FilterConfig filterConfig ,
9090 final String consumerId ,
9191 final StartingPosition startingPosition ) {
92- this (topicConfig , filterConfig , consumerId , startingPosition , new ArrayList <>(), 10 , true );
92+ this (topicConfig , filterConfig , consumerId , startingPosition , new ArrayList <>(), 10 , true , 2000 );
9393 }
9494
9595 /**
@@ -101,6 +101,7 @@ private ClientConfig(
101101 * @param partitionIds List of partitionIds to limit consuming from.
102102 * @param maxResultsPerPartition How many records to poll per partition.
103103 * @param isAutoCommitEnabled If the consumer should auto commit state or not.
104+ * @param pollTimeoutMs poll timeout in milliseconds.
104105 */
105106 private ClientConfig (
106107 final TopicConfig topicConfig ,
@@ -109,7 +110,8 @@ private ClientConfig(
109110 final StartingPosition startingPosition ,
110111 final Collection <Integer > partitionIds ,
111112 final int maxResultsPerPartition ,
112- final boolean isAutoCommitEnabled ) {
113+ final boolean isAutoCommitEnabled ,
114+ final long pollTimeoutMs ) {
113115
114116 this .topicConfig = topicConfig ;
115117 this .filterConfig = filterConfig ;
@@ -120,6 +122,7 @@ private ClientConfig(
120122 this .partitionIds = Collections .unmodifiableSet (tempSet );
121123 this .maxResultsPerPartition = maxResultsPerPartition ;
122124 this .isAutoCommitEnabled = isAutoCommitEnabled ;
125+ this .pollTimeoutMs = pollTimeoutMs ;
123126 }
124127
125128 public TopicConfig getTopicConfig () {
@@ -206,6 +209,7 @@ public static class Builder {
206209 private StartingPosition startingPosition = StartingPosition .newResumeFromExistingState ();
207210 private int maxResultsPerPartition = 10 ;
208211 private boolean autoCommit = true ;
212+ private long pollTimeoutMs = 2000 ;
209213
210214 /**
211215 * Private constructor.
@@ -303,12 +307,22 @@ public Builder withAutoCommitDisabled() {
303307 return this ;
304308 }
305309
310+
311+ /**
312+ * Declare pollTimeout in milliseconds.
313+ */
314+ public Builder withPollTimeoutMs (final long pollTimeoutMs ) {
315+ this .pollTimeoutMs = pollTimeoutMs ;
316+ return this ;
317+ }
318+
306319 /**
307320 * Create new ClientConfig instance.
308321 */
309322 public ClientConfig build () {
310323 return new ClientConfig (
311- topicConfig , filterConfig , consumerId , startingPosition , limitPartitions , maxResultsPerPartition , autoCommit );
324+ topicConfig , filterConfig , consumerId , startingPosition , limitPartitions , maxResultsPerPartition ,
325+ autoCommit , pollTimeoutMs );
312326 }
313327 }
314328}
0 commit comments