Skip to content

Commit a952c3d

Browse files
authored
Add support for JSON and SSE responses to invoke endpoint (#1643)
* Add support for JSON and SSE responses to invoke endpoint * Update API endpoint for storing and getting parameters and return type * Using application/json as default accept header * Using the new API in fe scaling test * backward compatibility for invoke endpoint for inkwell * Add output uri paths to task event * remove return
1 parent 976da6b commit a952c3d

File tree

11 files changed

+234
-98
lines changed

11 files changed

+234
-98
lines changed

.vscode/settings.json

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,5 +2,19 @@
22
"python.analysis.extraPaths": [
33
"./indexify/src",
44
"./tensorlake/src"
5+
],
6+
"cSpell.words": [
7+
"cpus",
8+
"nanos",
9+
"readahead",
10+
"retriable",
11+
"rocksdb",
12+
"sched",
13+
"schedulable",
14+
"strs",
15+
"tensorlake",
16+
"tombstoned",
17+
"Upserted",
18+
"utoipa"
519
]
620
}

indexify/tests/cli/test_server_fe_scaling.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ def test_server_uses_the_same_function_executor_if_fe_task_queue_doesnt_overflow
8080

8181
invocation_ids: List[str] = []
8282
for _ in range(_FE_ALLOCATIONS_QUEUE_SIZE):
83-
invocation_id = graph.run(block_until_done=False, sleep_secs=0.01)
83+
invocation_id = graph.call(sleep_secs=0.01)
8484
invocation_ids.append(invocation_id)
8585

8686
fe_ids: Set[str] = set()

server/src/data_model/mod.rs

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -339,6 +339,10 @@ pub struct ComputeFn {
339339
#[serde(default)]
340340
pub retry_policy: FunctionRetryPolicy,
341341
pub cache_key: Option<CacheKey>,
342+
#[serde(default)]
343+
pub parameters: Vec<ParameterMetadata>,
344+
#[serde(default)]
345+
pub return_type: Option<serde_json::Value>,
342346
}
343347

344348
impl ComputeFn {
@@ -414,6 +418,14 @@ pub struct RuntimeInformation {
414418
pub sdk_version: String,
415419
}
416420

421+
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
422+
pub struct ParameterMetadata {
423+
pub name: String,
424+
pub description: Option<String>,
425+
pub required: bool,
426+
pub data_type: serde_json::Value,
427+
}
428+
417429
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
418430
pub struct ComputeGraph {
419431
pub namespace: String,
@@ -1048,7 +1060,7 @@ pub enum TaskFailureReason {
10481060
InternalError,
10491061
// Clear function code failure typically by raising an exception from the function code.
10501062
FunctionError,
1051-
// Function code run time exceeded its cofigured timeout.
1063+
// Function code run time exceeded its configured timeout.
10521064
FunctionTimeout,
10531065
// Function code raised InvocationError to mark the invocation as permanently failed.
10541066
InvocationError,

server/src/http_objects.rs

Lines changed: 37 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -424,6 +424,10 @@ pub struct ComputeFn {
424424
pub retry_policy: NodeRetryPolicy,
425425
#[serde(rename = "cache_key")]
426426
pub cache_key: Option<CacheKey>,
427+
#[serde(default)]
428+
pub parameters: Vec<ParameterMetadata>,
429+
#[serde(default)]
430+
pub return_type: Option<serde_json::Value>,
427431
}
428432

429433
impl From<ComputeFn> for data_model::ComputeFn {
@@ -442,6 +446,8 @@ impl From<ComputeFn> for data_model::ComputeFn {
442446
resources: val.resources.into(),
443447
retry_policy: val.retry_policy.into(),
444448
cache_key: val.cache_key.and_then(|v| Some(v.into())),
449+
parameters: val.parameters.into_iter().map(|p| p.into()).collect(),
450+
return_type: val.return_type,
445451
}
446452
}
447453
}
@@ -461,6 +467,8 @@ impl From<data_model::ComputeFn> for ComputeFn {
461467
resources: c.resources.into(),
462468
retry_policy: c.retry_policy.into(),
463469
cache_key: c.cache_key.and_then(|v| Some(v.into())),
470+
parameters: c.parameters.into_iter().map(|p| p.into()).collect(),
471+
return_type: c.return_type,
464472
}
465473
}
466474
}
@@ -512,6 +520,35 @@ impl From<data_model::RuntimeInformation> for RuntimeInformation {
512520
}
513521
}
514522

523+
#[derive(Debug, Serialize, Deserialize, ToSchema, Clone)]
524+
pub struct ParameterMetadata {
525+
pub name: String,
526+
pub description: Option<String>,
527+
pub required: bool,
528+
pub data_type: serde_json::Value,
529+
}
530+
531+
impl From<data_model::ParameterMetadata> for ParameterMetadata {
532+
fn from(parameter: data_model::ParameterMetadata) -> Self {
533+
Self {
534+
name: parameter.name,
535+
description: parameter.description,
536+
required: parameter.required,
537+
data_type: parameter.data_type,
538+
}
539+
}
540+
}
541+
542+
impl From<ParameterMetadata> for data_model::ParameterMetadata {
543+
fn from(parameter: ParameterMetadata) -> Self {
544+
Self {
545+
name: parameter.name,
546+
description: parameter.description,
547+
required: parameter.required,
548+
data_type: parameter.data_type,
549+
}
550+
}
551+
}
515552
#[derive(Debug, Serialize, Deserialize, ToSchema)]
516553
pub struct ComputeGraph {
517554
pub name: String,
@@ -1136,11 +1173,6 @@ pub struct ExecutorsAllocationsResponse {
11361173
pub executors: Vec<ExecutorAllocations>,
11371174
}
11381175

1139-
#[derive(Debug, Serialize, Deserialize, ToSchema)]
1140-
pub struct RequestQueryParams {
1141-
pub block_until_finish: Option<bool>,
1142-
}
1143-
11441176
#[cfg(test)]
11451177
mod tests {
11461178
use crate::http_objects::ComputeFn;

server/src/routes/compute_graphs.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -203,10 +203,10 @@ pub async fn list_compute_graphs(
203203
/// Get a compute graph definition
204204
#[utoipa::path(
205205
get,
206-
path = "/namespaces/{namespace}/compute_graphs/{compute_graph}",
206+
path = "/v1/namespaces/{namespace}/compute-graphs/{compute_graph}",
207207
tag = "operations",
208208
responses(
209-
(status = 200, description = "compute graph definition", body = http_objects::ComputeGraph),
209+
(status = 200, description = "compute graph definition", body = http_objects_v1::ComputeGraph),
210210
(status = INTERNAL_SERVER_ERROR, description = "internal server error")
211211
),
212212
)]

server/src/routes/download.rs

Lines changed: 0 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -13,46 +13,6 @@ use crate::{
1313
http_objects::{IndexifyAPIError, RequestError},
1414
};
1515

16-
pub async fn download_invocation_payload(
17-
Path((namespace, compute_graph, invocation_id)): Path<(String, String, String)>,
18-
State(state): State<RouteState>,
19-
) -> Result<Response<Body>, IndexifyAPIError> {
20-
let output = state
21-
.indexify_state
22-
.reader()
23-
.invocation_payload(&namespace, &compute_graph, &invocation_id)
24-
.map_err(|e| {
25-
IndexifyAPIError::internal_error(anyhow!(
26-
"failed to download invocation payload: {}",
27-
e
28-
))
29-
})?;
30-
let storage_reader = state
31-
.blob_storage
32-
.get(&output.payload.path)
33-
.await
34-
.map_err(IndexifyAPIError::internal_error)?;
35-
36-
if output.encoding == "application/json" {
37-
let json_bytes = storage_reader
38-
.map_ok(|chunk| chunk.to_vec())
39-
.try_concat()
40-
.await
41-
.map_err(|e| IndexifyAPIError::internal_error(anyhow!("Failed to read JSON: {}", e)))?;
42-
43-
return Response::builder()
44-
.header("Content-Type", output.encoding)
45-
.body(Body::from(json_bytes))
46-
.map_err(|e| IndexifyAPIError::internal_error_str(&e.to_string()));
47-
}
48-
49-
Response::builder()
50-
.header("Content-Type", output.encoding)
51-
.header("Content-Length", output.payload.size.to_string())
52-
.body(Body::from_stream(storage_reader))
53-
.map_err(|e| IndexifyAPIError::internal_error_str(&e.to_string()))
54-
}
55-
5616
pub async fn download_invocation_error(
5717
invocation_error: Option<GraphInvocationError>,
5818
blob_storage: &BlobStorage,

0 commit comments

Comments
 (0)