|
1 | | -from __future__ import absolute_import |
| 1 | +from __future__ import absolute_import, division |
2 | 2 |
|
3 | 3 | import collections |
4 | 4 | import copy |
@@ -138,9 +138,9 @@ def maybe_expire(self, request_timeout_ms, retry_backoff_ms, linger_ms, is_full, |
138 | 138 | """ |
139 | 139 | now = time.time() if now is None else now |
140 | 140 | since_append = now - self.last_append |
141 | | - since_ready = now - (self.created + linger_ms / 1000.0) |
142 | | - since_backoff = now - (self.last_attempt + retry_backoff_ms / 1000.0) |
143 | | - timeout = request_timeout_ms / 1000.0 |
| 141 | + since_ready = now - (self.created + linger_ms / 1000) |
| 142 | + since_backoff = now - (self.last_attempt + retry_backoff_ms / 1000) |
| 143 | + timeout = request_timeout_ms / 1000 |
144 | 144 |
|
145 | 145 | error = None |
146 | 146 | if not self.in_retry() and is_full and timeout < since_append: |
@@ -431,10 +431,10 @@ def ready(self, cluster, now=None): |
431 | 431 | if not dq: |
432 | 432 | continue |
433 | 433 | batch = dq[0] |
434 | | - retry_backoff = self.config['retry_backoff_ms'] / 1000.0 |
435 | | - linger = self.config['linger_ms'] / 1000.0 |
436 | | - backing_off = bool(batch.attempts > 0 and |
437 | | - batch.last_attempt + retry_backoff > now) |
| 434 | + retry_backoff = self.config['retry_backoff_ms'] / 1000 |
| 435 | + linger = self.config['linger_ms'] / 1000 |
| 436 | + backing_off = bool(batch.attempts > 0 |
| 437 | + and (batch.last_attempt + retry_backoff) > now) |
438 | 438 | waited_time = now - batch.last_attempt |
439 | 439 | time_to_wait = retry_backoff if backing_off else linger |
440 | 440 | time_left = max(time_to_wait - waited_time, 0) |
@@ -499,12 +499,8 @@ def drain(self, cluster, nodes, max_size, now=None): |
499 | 499 | dq = self._batches[tp] |
500 | 500 | if dq: |
501 | 501 | first = dq[0] |
502 | | - backoff = ( |
503 | | - bool(first.attempts > 0) and |
504 | | - bool(first.last_attempt + |
505 | | - self.config['retry_backoff_ms'] / 1000.0 |
506 | | - > now) |
507 | | - ) |
| 502 | + backoff = bool(first.attempts > 0 and |
| 503 | + first.last_attempt + self.config['retry_backoff_ms'] / 1000 > now) |
508 | 504 | # Only drain the batch if it is not during backoff |
509 | 505 | if not backoff: |
510 | 506 | if (size + first.records.size_in_bytes() > max_size |
|
0 commit comments