Skip to content

Commit 102eddd

Browse files
maybe-sybrauvipy
authored andcommitted
fix: Calling of errbacks when chords fail
We had a special case for calling errbacks when a chord failed which assumed they were old style. This change ensures that we call the proper errback dispatch method which understands new and old style errbacks, and adds test to confirm that things behave as one might expect now.
1 parent ee9a251 commit 102eddd

File tree

6 files changed

+304
-39
lines changed

6 files changed

+304
-39
lines changed

celery/backends/base.py

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -278,19 +278,24 @@ def mark_as_retry(self, task_id, exc, traceback=None,
278278
traceback=traceback, request=request)
279279

280280
def chord_error_from_stack(self, callback, exc=None):
281-
# need below import for test for some crazy reason
282-
from celery import group # pylint: disable
283281
app = self.app
284282
try:
285283
backend = app._tasks[callback.task].backend
286284
except KeyError:
287285
backend = self
286+
# We have to make a fake request since either the callback failed or
287+
# we're pretending it did since we don't have information about the
288+
# chord part(s) which failed. This request is constructed as a best
289+
# effort for new style errbacks and may be slightly misleading about
290+
# what really went wrong, but at least we call them!
291+
fake_request = Context({
292+
"id": callback.options.get("task_id"),
293+
"errbacks": callback.options.get("link_error", []),
294+
"delivery_info": dict(),
295+
**callback
296+
})
288297
try:
289-
group(
290-
[app.signature(errback)
291-
for errback in callback.options.get('link_error') or []],
292-
app=app,
293-
).apply_async((callback.id,))
298+
self._call_task_errbacks(fake_request, exc, None)
294299
except Exception as eb_exc: # pylint: disable=broad-except
295300
return backend.fail_from_current_stack(callback.id, exc=eb_exc)
296301
else:

celery/canvas.py

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1123,18 +1123,17 @@ def set_immutable(self, immutable):
11231123
task.set_immutable(immutable)
11241124

11251125
def link(self, sig):
1126-
# Simply link to first task
1126+
# Simply link to first task. Doing this is slightly misleading because
1127+
# the callback may be executed before all children in the group are
1128+
# completed and also if any children other than the first one fail.
1129+
#
1130+
# The callback signature is cloned and made immutable since it the
1131+
# first task isn't actually capable of passing the return values of its
1132+
# siblings to the callback task.
11271133
sig = sig.clone().set(immutable=True)
11281134
return self.tasks[0].link(sig)
11291135

11301136
def link_error(self, sig):
1131-
try:
1132-
sig = sig.clone().set(immutable=True)
1133-
except AttributeError:
1134-
# See issue #5265. I don't use isinstance because current tests
1135-
# pass a Mock object as argument.
1136-
sig['immutable'] = True
1137-
sig = Signature.from_dict(sig)
11381137
# Any child task might error so we need to ensure that they are all
11391138
# capable of calling the linked error signature. This opens the
11401139
# possibility that the task is called more than once but that's better

t/integration/tasks.py

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -301,11 +301,6 @@ def fail_replaced(self, *args):
301301
raise self.replace(fail.si(*args))
302302

303303

304-
@shared_task
305-
def chord_error(*args):
306-
return args
307-
308-
309304
@shared_task(bind=True)
310305
def return_priority(self, *_args):
311306
return "Priority: %s" % self.request.delivery_info['priority']
@@ -385,3 +380,15 @@ def _recurse(sig):
385380
if isinstance(sig, chord):
386381
_recurse(sig.body)
387382
_recurse(sig_obj)
383+
384+
385+
@shared_task
386+
def errback_old_style(request_id):
387+
redis_count(request_id)
388+
return request_id
389+
390+
391+
@shared_task
392+
def errback_new_style(request, exc, tb):
393+
redis_count(request.id)
394+
return request.id

0 commit comments

Comments
 (0)