Skip to content

Commit 56faf29

Browse files
authored
Merge pull request #208 from botify-labs/enhancement/108/add-retry-on-poll
Add retry on swf.process.Poller.poll and fail
2 parents 3efc2c6 + 34b759e commit 56faf29

File tree

5 files changed

+36
-28
lines changed

5 files changed

+36
-28
lines changed

simpleflow/swf/process/decider/base.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,7 @@ def process(self, decision_response):
149149
try:
150150
logger.info('completing decision for workflow {}'.format(
151151
self._workflow_name))
152-
self._complete(decision_response.token, decisions)
152+
self.complete_with_retry(decision_response.token, decisions)
153153
except Exception as err:
154154
logger.error('cannot complete decision: {}'.format(err))
155155

simpleflow/swf/process/poller.py

Lines changed: 27 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -9,17 +9,16 @@
99
from simpleflow.process import NamedMixin, with_state
1010
from simpleflow.swf.helpers import swf_identity
1111

12-
1312
logger = logging.getLogger(__name__)
1413

15-
1614
__all__ = ['Poller']
1715

1816

1917
class Poller(swf.actors.Actor, NamedMixin):
2018
"""Multi-processing implementation of a SWF actor.
2119
2220
"""
21+
2322
def __init__(self, domain, task_list=None):
2423
self.is_alive = False
2524
self._named_mixin_properties = ["task_list"]
@@ -52,6 +51,7 @@ def bind_signal_handlers(self):
5251
- SIGTERM and SIGINT lead to a graceful shutdown
5352
- other signals are not modified for now
5453
"""
54+
5555
# NB: Function is nested to have a reference to *self*.
5656
def _handle_graceful_shutdown(signum, frame):
5757
logger.info("process: caught signal signal=SIGTERM pid={}".format(os.getpid()))
@@ -74,7 +74,7 @@ def start(self):
7474
self.set_process_name()
7575
while self.is_alive:
7676
try:
77-
response = self._poll()
77+
response = self.poll_with_retry()
7878
except swf.exceptions.PollTimeout:
7979
continue
8080
self.process(response)
@@ -87,7 +87,7 @@ def stop_gracefully(self):
8787
logger.info('stopping %s', self.name)
8888
self.is_alive = False # No longer take requests.
8989

90-
def _complete(self, token, response):
90+
def complete_with_retry(self, token, response):
9191
"""
9292
Complete with retry.
9393
:param token:
@@ -97,7 +97,6 @@ def _complete(self, token, response):
9797
:return:
9898
:rtype:
9999
"""
100-
# FIXME this is a public member
101100
try:
102101
complete = utils.retry.with_delay(
103102
nb_times=self.nb_retries,
@@ -131,7 +130,7 @@ def name(self):
131130
"""
132131
return '{}()'.format(self.__class__.__name__)
133132

134-
def _poll(self):
133+
def poll_with_retry(self):
135134
"""
136135
Polls a task represented by its token and data. It uses long-polling
137136
with a timeout of one minute.
@@ -147,19 +146,26 @@ def _poll(self):
147146
identity = self.identity
148147

149148
logger.debug("polling task on %s", task_list)
150-
try:
151-
response = self.poll(
152-
task_list,
153-
identity=identity,
154-
)
155-
except swf.exceptions.PollTimeout:
156-
logger.debug('{}: PollTimeout'.format(self))
157-
raise
158-
except Exception as err:
159-
logger.error(
160-
"exception %s when polling on %s",
161-
str(err),
162-
task_list,
163-
)
164-
raise
149+
poll = utils.retry.with_delay(
150+
nb_times=self.nb_retries,
151+
delay=utils.retry.exponential,
152+
log_with=logger.exception,
153+
on_exceptions=swf.exceptions.ResponseError,
154+
)(self.poll)
155+
response = poll(task_list, identity=identity)
156+
return response
157+
158+
@abc.abstractmethod
159+
def fail(self, *args, **kwargs):
160+
"""fail; only relevant for activity workers."""
161+
raise NotImplementedError
162+
163+
def fail_with_retry(self, *args, **kwargs):
164+
fail = utils.retry.with_delay(
165+
nb_times=self.nb_retries,
166+
delay=utils.retry.exponential,
167+
log_with=logger.exception,
168+
on_exceptions=swf.exceptions.ResponseError,
169+
)(self.fail)
170+
response = fail(*args, **kwargs)
165171
return response

simpleflow/swf/process/worker/base.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -142,17 +142,17 @@ def process(self, poller, token, task):
142142
except Exception as err:
143143
logger.exception("process error: {}".format(str(err)))
144144
tb = traceback.format_exc()
145-
return poller.fail(token, task, reason=str(err), details=tb)
145+
return poller.fail_with_retry(token, task, reason=str(err), details=tb)
146146

147147
try:
148-
poller._complete(token, json_dumps(result))
148+
poller.complete_with_retry(token, json_dumps(result))
149149
except Exception as err:
150150
logger.exception("complete error")
151151
reason = 'cannot complete task {}: {}'.format(
152152
task.activity_id,
153153
err,
154154
)
155-
poller.fail(token, task, reason)
155+
poller.fail_with_retry(token, task, reason)
156156

157157

158158
def process_task(poller, token, task):
@@ -204,7 +204,7 @@ def worker_alive():
204204
worker.pid
205205
))
206206
if worker.exitcode != 0:
207-
poller.fail(
207+
poller.fail_with_retry(
208208
token,
209209
task,
210210
reason='process {} died: exit code {}'.format(

simpleflow/utils/retry.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,10 +42,12 @@ def with_delay(
4242
:type delay: callable(value: int) -> int
4343
4444
:param on_exceptions: retry only when these exceptions raise.
45-
:type on_exceptions: Sequence([Exception])
45+
:type on_exceptions: Exception | Sequence([Exception])
4646
4747
:param except_on: don't retry on these exceptions.
4848
:type except_on: Sequence([Exception])
49+
50+
:param log_with: logger instance to use.
4951
"""
5052
if log_with is None:
5153
log_with = logging.getLogger(__name__).info

tests/test_simpleflow/swf/process/test_poller.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ class FakePoller(Poller):
1717
"""
1818
This poller only waits 2 seconds then exits.
1919
"""
20-
def _poll(self):
20+
def poll_with_retry(self):
2121
# NB: time.sleep gets interrupted by any signal, so the following lines
2222
# are not actually as dumb as they seem to be...
2323
time.sleep(1)

0 commit comments

Comments
 (0)