Skip to content

Commit c4bbde6

Browse files
authored
Invocation errors and refactoring of Function Executor and Executor protocols (#1499)
* Implement InvocationError If InvocationError is raised by customer function code then then its current invocation fails. InvocationError requires a string message argument that can be retrieved via Server API and Tensrolake CLI. Also refactor FunctionExecutor and Executor grpc protocols because we don't need backward compatibility right now - it's a good window to do this. The CI tests will fail until we deploy the required changes in Indexify (due to FE and Executor grpc protocol changes). * Remove Server side routing fallback Now Function Executor always provides next_functions in its run task response. The next functions are either produced by the function or edges from the graph definition object. This allows FE to fully control next_functions and server just following with the decisions. This moves the next_functions list ownership to a single place (FE/SDK) and also simplifies the FE and Executor protocols. * Use DataPayload for InvocationError message/payload This makes it much more flexible wrt the size of the InvocationError message and lets SDK own what can be stored there. Users can now add any size of string message to InvocationError and in the future we can easily allow DataModels there and etc if needed. This also removes the need to store the InvocationError message in Server state store. All we need is just put DataPayload there. * Implement InvocationError on Server side Ingest invocation error message payload, fail graph invocation if invocation error happened, add invocation error details to HTTP API invocation object. * Use pypi tensorlake instead of local This was used for local development, shouldn't be merged. * Add Executor proto enum comments and Server enum comments
1 parent 8910825 commit c4bbde6

29 files changed

+757
-315
lines changed

indexify/src/indexify/executor/executor.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ def __init__(
5252
blob_store: BLOBStore,
5353
host_resources_provider: HostResourcesProvider,
5454
):
55-
self._logger = structlog.get_logger(module=__name__)
55+
self._logger = structlog.get_logger(module=__name__, executor_id=id)
5656
protocol: str = "http"
5757
if config_path:
5858
self._logger.info("running the extractor with TLS enabled")

indexify/src/indexify/executor/function_executor/function_executor.py

Lines changed: 35 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@
77
from tensorlake.function_executor.proto.function_executor_pb2 import (
88
InfoRequest,
99
InfoResponse,
10+
InitializationFailureReason,
11+
InitializationOutcomeCode,
1012
InitializeRequest,
1113
InitializeResponse,
1214
)
@@ -315,16 +317,42 @@ async def _initialize_server(
315317
initialize_request,
316318
timeout=customer_code_timeout_sec,
317319
)
318-
# TODO: set real stdout and stderr when their proper capturing on FE initialization is implemented.
319-
if initialize_response.success:
320-
return FunctionExecutorInitializationResult()
321-
elif initialize_response.HasField("customer_error"):
320+
321+
if (
322+
initialize_response.outcome_code
323+
== InitializationOutcomeCode.INITIALIZE_OUTCOME_CODE_SUCCESS
324+
):
322325
return FunctionExecutorInitializationResult(
323-
error=FunctionExecutorInitializationError.FUNCTION_ERROR,
324-
stderr=initialize_response.customer_error,
326+
stdout=initialize_response.stdout, stderr=initialize_response.stderr
325327
)
328+
elif (
329+
initialize_response.outcome_code
330+
== InitializationOutcomeCode.INITIALIZE_OUTCOME_CODE_FAILURE
331+
):
332+
if (
333+
initialize_response.failure_reason
334+
== InitializationFailureReason.INITIALIZATION_FAILURE_REASON_FUNCTION_ERROR
335+
):
336+
return FunctionExecutorInitializationResult(
337+
error=FunctionExecutorInitializationError.FUNCTION_ERROR,
338+
stdout=initialize_response.stdout,
339+
stderr=initialize_response.stderr,
340+
)
341+
elif (
342+
initialize_response.failure_reason
343+
== InitializationFailureReason.INITIALIZATION_FAILURE_REASON_INTERNAL_ERROR
344+
):
345+
# Don't add stdout/stderr because this is customer data.
346+
raise RuntimeError("initialize RPC failed with internal error")
347+
else:
348+
raise ValueError(
349+
f"unexpected failure reason {InitializationFailureReason.Name(initialize_response.failure_reason)} in initialize RPC response"
350+
)
326351
else:
327-
raise Exception("initialize RPC failed at function executor server")
352+
raise ValueError(
353+
f"unexpected outcome code {InitializationOutcomeCode.Name(initialize_response.outcome_code)} in initialize RPC response"
354+
)
355+
328356
except grpc.aio.AioRpcError as e:
329357
if e.code() == grpc.StatusCode.DEADLINE_EXCEEDED:
330358
return FunctionExecutorInitializationResult(

indexify/src/indexify/executor/function_executor/invocation_state_client.py

Lines changed: 26 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
InvocationStateRequest,
99
InvocationStateResponse,
1010
SerializedObject,
11+
SerializedObjectEncoding,
1112
SetInvocationStateResponse,
1213
)
1314
from tensorlake.function_executor.proto.function_executor_pb2_grpc import (
@@ -25,6 +26,10 @@
2526
metric_server_set_state_requests,
2627
)
2728

29+
# We're currently only supporting CloudPickle for invocation state values.
30+
# FIXME: if Function Executor sends us something else then we fail the calls.
31+
_VALUE_CONTENT_TYPE = "application/octet-stream"
32+
2833

2934
class InvocationStateClient:
3035
"""InvocationStateClient is a client for the invocation state server of a Function Executor.
@@ -196,14 +201,21 @@ async def _set_server_state(
196201
url: str = (
197202
f"{self._base_url}/internal/namespaces/{self._namespace}/compute_graphs/{self._graph}/invocations/{invocation_id}/ctx/{key}"
198203
)
199-
payload = value.bytes if value.HasField("bytes") else value.string
204+
if (
205+
value.encoding
206+
!= SerializedObjectEncoding.SERIALIZED_OBJECT_ENCODING_BINARY_PICKLE
207+
):
208+
raise ValueError(
209+
f"Unsupported value encoding: {SerializedObjectEncoding.Name(value.encoding)}. "
210+
"Only binary pickle is supported for invocation state values."
211+
)
200212

201213
response = await self._http_client.post(
202214
url=url,
203215
files=[
204216
(
205217
"value",
206-
("value", payload, value.content_type),
218+
("value", value.data, _VALUE_CONTENT_TYPE),
207219
),
208220
],
209221
)
@@ -245,7 +257,7 @@ async def _get_server_state(
245257
)
246258
raise
247259

248-
return serialized_object_from_http_response(response)
260+
return _serialized_object_from_http_response(response)
249261

250262
def _validate_request(self, request: InvocationStateRequest) -> None:
251263
(
@@ -265,17 +277,15 @@ def _validate_request(self, request: InvocationStateRequest) -> None:
265277
raise ValueError("unknown request type")
266278

267279

268-
def serialized_object_from_http_response(response: httpx.Response) -> SerializedObject:
269-
# We're hardcoding the content type currently used by Python SDK. It might change in the future.
270-
# There's no other way for now to determine if the response is a bytes or string.
271-
if response.headers["content-type"] in [
272-
"application/octet-stream",
273-
"application/pickle",
274-
]:
275-
return SerializedObject(
276-
bytes=response.content, content_type=response.headers["content-type"]
277-
)
278-
else:
279-
return SerializedObject(
280-
string=response.text, content_type=response.headers["content-type"]
280+
def _serialized_object_from_http_response(response: httpx.Response) -> SerializedObject:
281+
if response.headers["content-type"] != _VALUE_CONTENT_TYPE:
282+
raise ValueError(
283+
f"Unexpected content type: {response.headers['content-type']}. "
284+
f"Expected: {_VALUE_CONTENT_TYPE}."
281285
)
286+
287+
return SerializedObject(
288+
data=response.content,
289+
encoding=SerializedObjectEncoding.SERIALIZED_OBJECT_ENCODING_BINARY_PICKLE,
290+
encoding_version=0,
291+
)

indexify/src/indexify/executor/function_executor_controller/completed_task_metrics.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ def emit_completed_task_metrics(task_info: TaskInfo, logger: Any) -> None:
6666
elif task_failure_reason in [
6767
TaskFailureReason.TASK_FAILURE_REASON_FUNCTION_ERROR,
6868
TaskFailureReason.TASK_FAILURE_REASON_FUNCTION_TIMEOUT,
69+
TaskFailureReason.TASK_FAILURE_REASON_INVOCATION_ERROR,
6970
]:
7071
metric_tasks_completed.labels(
7172
outcome_code=METRIC_TASKS_COMPLETED_OUTCOME_CODE_FAILURE,

indexify/src/indexify/executor/function_executor_controller/create_function_executor.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
from tensorlake.function_executor.proto.function_executor_pb2 import (
66
InitializeRequest,
77
SerializedObject,
8+
SerializedObjectEncoding,
89
)
910

1011
from indexify.executor.blob_store.blob_store import BLOBStore

indexify/src/indexify/executor/function_executor_controller/downloads.py

Lines changed: 20 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,10 @@
44
from typing import Any, Optional
55

66
import nanoid
7-
from tensorlake.function_executor.proto.function_executor_pb2 import SerializedObject
7+
from tensorlake.function_executor.proto.function_executor_pb2 import (
8+
SerializedObject,
9+
SerializedObjectEncoding,
10+
)
811

912
from indexify.executor.blob_store.blob_store import BLOBStore
1013
from indexify.proto.executor_api_pb2 import (
@@ -179,20 +182,28 @@ def _serialized_object_from_data_payload_proto(
179182
"""
180183
if data_payload.encoding == DataPayloadEncoding.DATA_PAYLOAD_ENCODING_BINARY_PICKLE:
181184
return SerializedObject(
182-
bytes=data,
183-
content_type="application/octet-stream",
185+
data=data,
186+
encoding=SerializedObjectEncoding.SERIALIZED_OBJECT_ENCODING_BINARY_PICKLE,
187+
encoding_version=data_payload.encoding_version,
184188
)
185189
elif data_payload.encoding == DataPayloadEncoding.DATA_PAYLOAD_ENCODING_UTF8_TEXT:
186190
return SerializedObject(
187-
content_type="text/plain",
188-
string=data.decode("utf-8"),
191+
data=data,
192+
encoding=SerializedObjectEncoding.SERIALIZED_OBJECT_ENCODING_UTF8_TEXT,
193+
encoding_version=data_payload.encoding_version,
189194
)
190195
elif data_payload.encoding == DataPayloadEncoding.DATA_PAYLOAD_ENCODING_UTF8_JSON:
191-
result = SerializedObject(
192-
content_type="application/json",
193-
string=data.decode("utf-8"),
196+
return SerializedObject(
197+
data=data,
198+
encoding=SerializedObjectEncoding.SERIALIZED_OBJECT_ENCODING_UTF8_JSON,
199+
encoding_version=data_payload.encoding_version,
200+
)
201+
elif data_payload.encoding == DataPayloadEncoding.DATA_PAYLOAD_ENCODING_BINARY_ZIP:
202+
return SerializedObject(
203+
data=data,
204+
encoding=SerializedObjectEncoding.SERIALIZED_OBJECT_ENCODING_BINARY_ZIP,
205+
encoding_version=data_payload.encoding_version,
194206
)
195-
return result
196207

197208
raise ValueError(
198209
f"Can't convert data payload {data_payload} into serialized object"

indexify/src/indexify/executor/function_executor_controller/function_executor_controller.py

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -197,7 +197,6 @@ def remove_task(self, task_id: str) -> None:
197197
task_info.is_cancelled = True
198198
logger.info(
199199
"cancelling task",
200-
allocation_id=task_info.allocation.allocation_id,
201200
)
202201
if task_info.aio_task is not None:
203202
task_info.aio_task.cancel()
@@ -332,7 +331,7 @@ async def _control_loop(self) -> None:
332331
self._logger.error(
333332
"unexpected exception in function executor controller control loop",
334333
exc_info=e,
335-
fe_event=str(event),
334+
event_type=event.event_type.name,
336335
)
337336

338337
def _handle_event(self, event: BaseEvent) -> None:
@@ -786,16 +785,14 @@ def _to_task_result_proto(output: TaskOutput) -> TaskResult:
786785
graph_invocation_id=output.allocation.task.graph_invocation_id,
787786
reducer=output.reducer,
788787
outcome_code=output.outcome_code,
789-
next_functions=(output.router_output.edges if output.router_output else []),
788+
failure_reason=output.failure_reason,
789+
next_functions=output.next_functions,
790790
function_outputs=output.uploaded_data_payloads,
791+
invocation_error_output=output.uploaded_invocation_error_output,
791792
)
792-
if output.failure_reason is not None:
793-
task_result.failure_reason = output.failure_reason
794793
if output.uploaded_stdout is not None:
795794
task_result.stdout.CopyFrom(output.uploaded_stdout)
796795
if output.uploaded_stderr is not None:
797796
task_result.stderr.CopyFrom(output.uploaded_stderr)
798-
if output.router_output is not None:
799-
task_result.routing.next_functions[:] = output.router_output.edges
800797

801798
return task_result

indexify/src/indexify/executor/function_executor_controller/run_task.py

Lines changed: 68 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,13 @@
66
from tensorlake.function_executor.proto.function_executor_pb2 import (
77
RunTaskRequest,
88
RunTaskResponse,
9+
SerializedObject,
10+
)
11+
from tensorlake.function_executor.proto.function_executor_pb2 import (
12+
TaskFailureReason as FETaskFailureReason,
13+
)
14+
from tensorlake.function_executor.proto.function_executor_pb2 import (
15+
TaskOutcomeCode as FETaskOutcomeCode,
916
)
1017
from tensorlake.function_executor.proto.function_executor_pb2_grpc import (
1118
FunctionExecutorStub,
@@ -83,6 +90,7 @@ async def run_task_on_function_executor(
8390
task_info.output = _task_output_from_function_executor_response(
8491
allocation=task_info.allocation,
8592
response=response,
93+
logger=logger,
8694
)
8795
except grpc.aio.AioRpcError as e:
8896
if e.code() == grpc.StatusCode.DEADLINE_EXCEEDED:
@@ -140,45 +148,48 @@ async def run_task_on_function_executor(
140148

141149

142150
def _task_output_from_function_executor_response(
143-
allocation: TaskAllocation, response: RunTaskResponse
151+
allocation: TaskAllocation, response: RunTaskResponse, logger: Any
144152
) -> TaskOutput:
145153
response_validator = MessageValidator(response)
146154
response_validator.required_field("stdout")
147155
response_validator.required_field("stderr")
148156
response_validator.required_field("is_reducer")
149-
response_validator.required_field("success")
157+
response_validator.required_field("outcome_code")
150158

151159
metrics = TaskMetrics(counters={}, timers={})
152160
if response.HasField("metrics"):
153161
# Can be None if e.g. function failed.
154162
metrics.counters = dict(response.metrics.counters)
155163
metrics.timers = dict(response.metrics.timers)
156164

157-
output = TaskOutput(
165+
outcome_code: TaskOutcomeCode = _to_task_outcome_code(
166+
response.outcome_code, logger=logger
167+
)
168+
failure_reason: Optional[TaskFailureReason] = None
169+
invocation_error_output: Optional[SerializedObject] = None
170+
171+
if outcome_code == TaskOutcomeCode.TASK_OUTCOME_CODE_FAILURE:
172+
response_validator.required_field("failure_reason")
173+
failure_reason: Optional[TaskFailureReason] = _to_task_failure_reason(
174+
response.failure_reason, logger
175+
)
176+
if failure_reason == TaskFailureReason.TASK_FAILURE_REASON_INVOCATION_ERROR:
177+
response_validator.required_field("invocation_error_output")
178+
invocation_error_output = response.invocation_error_output
179+
180+
return TaskOutput(
158181
allocation=allocation,
159-
outcome_code=(
160-
TaskOutcomeCode.TASK_OUTCOME_CODE_SUCCESS
161-
if response.success
162-
else TaskOutcomeCode.TASK_OUTCOME_CODE_FAILURE
163-
),
164-
failure_reason=(
165-
None
166-
if response.success
167-
else TaskFailureReason.TASK_FAILURE_REASON_FUNCTION_ERROR
168-
),
182+
outcome_code=outcome_code,
183+
failure_reason=failure_reason,
184+
invocation_error_output=invocation_error_output,
185+
function_outputs=response.function_outputs,
186+
next_functions=response.next_functions,
169187
stdout=response.stdout,
170188
stderr=response.stderr,
171189
reducer=response.is_reducer,
172190
metrics=metrics,
173191
)
174192

175-
if response.HasField("function_output"):
176-
output.function_output = response.function_output
177-
if response.HasField("router_output"):
178-
output.router_output = response.router_output
179-
180-
return output
181-
182193

183194
def _log_task_execution_finished(output: TaskOutput, logger: Any) -> None:
184195
logger.info(
@@ -191,3 +202,40 @@ def _log_task_execution_finished(output: TaskOutput, logger: Any) -> None:
191202
else None
192203
),
193204
)
205+
206+
207+
def _to_task_outcome_code(
208+
fe_task_outcome_code: FETaskOutcomeCode, logger
209+
) -> TaskOutcomeCode:
210+
if fe_task_outcome_code == FETaskOutcomeCode.TASK_OUTCOME_CODE_SUCCESS:
211+
return TaskOutcomeCode.TASK_OUTCOME_CODE_SUCCESS
212+
elif fe_task_outcome_code == FETaskOutcomeCode.TASK_OUTCOME_CODE_FAILURE:
213+
return TaskOutcomeCode.TASK_OUTCOME_CODE_FAILURE
214+
else:
215+
logger.warning(
216+
"Unknown TaskOutcomeCode received from Function Executor",
217+
value=FETaskOutcomeCode.Name(fe_task_outcome_code),
218+
)
219+
return TaskOutcomeCode.TASK_OUTCOME_CODE_UNKNOWN
220+
221+
222+
def _to_task_failure_reason(
223+
fe_task_failure_reason: FETaskFailureReason, logger: Any
224+
) -> TaskFailureReason:
225+
if fe_task_failure_reason == FETaskFailureReason.TASK_FAILURE_REASON_FUNCTION_ERROR:
226+
return TaskFailureReason.TASK_FAILURE_REASON_FUNCTION_ERROR
227+
elif (
228+
fe_task_failure_reason
229+
== FETaskFailureReason.TASK_FAILURE_REASON_INVOCATION_ERROR
230+
):
231+
return TaskFailureReason.TASK_FAILURE_REASON_INVOCATION_ERROR
232+
elif (
233+
fe_task_failure_reason == FETaskFailureReason.TASK_FAILURE_REASON_INTERNAL_ERROR
234+
):
235+
return TaskFailureReason.TASK_FAILURE_REASON_INTERNAL_ERROR
236+
else:
237+
logger.warning(
238+
"Unknown TaskFailureReason received from Function Executor",
239+
value=FETaskFailureReason.Name(fe_task_failure_reason),
240+
)
241+
return TaskFailureReason.TASK_FAILURE_REASON_UNKNOWN

0 commit comments

Comments
 (0)