@@ -508,7 +508,7 @@ def _group_list_offset_requests(self, timestamps):
508508 return dict (timestamps_by_node )
509509
510510 def _send_list_offsets_request (self , node_id , timestamps_and_epochs ):
511- version = self ._client .api_version (ListOffsetsRequest , max_version = 4 )
511+ version = self ._client .api_version (ListOffsetsRequest , max_version = 5 )
512512 if self .config ['isolation_level' ] == 'read_committed' and version < 2 :
513513 raise Errors .UnsupportedVersionError ('read_committed isolation level requires ListOffsetsRequest >= v2' )
514514 by_topic = collections .defaultdict (list )
@@ -521,14 +521,14 @@ def _send_list_offsets_request(self, node_id, timestamps_and_epochs):
521521 data = (tp .partition , timestamp , 1 )
522522 by_topic [tp .topic ].append (data )
523523
524- if version <= 1 :
524+ if version >= 2 :
525525 request = ListOffsetsRequest [version ](
526526 - 1 ,
527+ self ._isolation_level ,
527528 list (six .iteritems (by_topic )))
528529 else :
529530 request = ListOffsetsRequest [version ](
530531 - 1 ,
531- self ._isolation_level ,
532532 list (six .iteritems (by_topic )))
533533
534534 # Client returns a future that only fails on network issues
@@ -588,7 +588,9 @@ def _handle_list_offsets_response(self, future, response):
588588 " message format version is before 0.10.0" , partition )
589589 elif error_type in (Errors .NotLeaderForPartitionError ,
590590 Errors .ReplicaNotAvailableError ,
591- Errors .KafkaStorageError ):
591+ Errors .KafkaStorageError ,
592+ Errors .OffsetNotAvailableError ,
593+ Errors .LeaderNotAvailableError ):
592594 log .debug ("Attempt to fetch offsets for partition %s failed due"
593595 " to %s, retrying." , error_type .__name__ , partition )
594596 partitions_to_retry .add (partition )
0 commit comments