Skip to content

Commit 2454090

Browse files
committed
Address some issues with error handling
1 parent 8ec7ec1 commit 2454090

File tree

1 file changed

+20
-12
lines changed

1 file changed

+20
-12
lines changed

src/dispatch/scheduler.py

Lines changed: 20 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -222,7 +222,7 @@ def _run(self, input: Input) -> Output:
222222
future = owner.result
223223
assert future is not None
224224
except (KeyError, AssertionError):
225-
logger.warning("skipping unexpected call result %s", cr)
225+
logger.warning("discarding unexpected call result %s", cr)
226226
continue
227227

228228
logger.debug("dispatching %s to %s", call_result, owner)
@@ -254,7 +254,6 @@ def _run(self, input: Input) -> Output:
254254
coroutine_id=coroutine.id, value=e.value
255255
)
256256
except Exception as e:
257-
raise
258257
coroutine_result = CoroutineResult(coroutine_id=coroutine.id, error=e)
259258

260259
# Handle coroutines that return or raise.
@@ -266,23 +265,27 @@ def _run(self, input: Input) -> Output:
266265

267266
# If this is the main coroutine, we're done.
268267
if coroutine.parent_id is None:
269-
assert len(state.suspended) == 0
268+
for suspended in state.suspended.values():
269+
suspended.coroutine.close()
270270
if coroutine_result.error is not None:
271271
return Output.error(
272272
Error.from_exception(coroutine_result.error)
273273
)
274274
return Output.value(coroutine_result.value)
275275

276276
# Otherwise, notify the parent of the result.
277-
assert coroutine.parent_id in state.suspended
278-
parent = state.suspended[coroutine.parent_id]
279-
assert parent.result is not None
280-
future = parent.result
281-
future.add(coroutine_result)
282-
if future.ready():
283-
state.ready.insert(0, parent)
284-
del state.suspended[parent.id]
285-
logger.debug("parent %s is now ready", parent)
277+
try:
278+
parent = state.suspended[coroutine.parent_id]
279+
future = parent.result
280+
assert future is not None
281+
except (KeyError, AssertionError):
282+
logger.warning("discarding %s", coroutine_result)
283+
else:
284+
future.add(coroutine_result)
285+
if future.ready():
286+
state.ready.insert(0, parent)
287+
del state.suspended[parent.id]
288+
logger.debug("parent %s is now ready", parent)
286289
continue
287290

288291
# Handle coroutines that yield.
@@ -343,6 +346,11 @@ def _run(self, input: Input) -> Output:
343346
logger.exception("state could not be serialized")
344347
return Output.error(Error.from_exception(e, status=Status.PERMANENT_ERROR))
345348

349+
# Close coroutines before yielding.
350+
for suspended in state.suspended.values():
351+
suspended.coroutine.close()
352+
state.suspended = {}
353+
346354
# Yield to Dispatch.
347355
logger.debug(
348356
"yielding to Dispatch with %d call(s) and %d bytes of state",

0 commit comments

Comments
 (0)