Skip to content

Commit 8b16513

Browse files
authored
simplify routes (#1650)
* simplify routes * add comment * add request_id to v1 invoke * fix request_id * fix build * returning no content if there is no output * update tensorlake * update deps
1 parent f77aa65 commit 8b16513

File tree

9 files changed

+110
-20
lines changed

9 files changed

+110
-20
lines changed

indexify/poetry.lock

Lines changed: 4 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

indexify/pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ psutil = "^7.0.0"
2929
boto3 = "^1.39.15"
3030
# Adds function-executor binary, utils lib, sdk used in indexify-cli commands.
3131
# We need to specify the tensorlake version exactly because pip install doesn't respect poetry.lock files.
32-
tensorlake = "0.2.33"
32+
tensorlake = "0.2.35"
3333
# Uncomment the next line to use local tensorlake package (only for development!)
3434
# tensorlake = { path = "../tensorlake", develop = true }
3535
# grpcio is provided by tensorlake

server/src/http_objects_v1.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@ impl From<GraphInvocationCtx> for ShallowGraphRequest {
122122
id: ctx.invocation_id.to_string(),
123123
created_at: ctx.created_at,
124124
status: if ctx.completed {
125-
RequestStatus::Finalized
125+
RequestStatus::Complete
126126
} else if ctx.outstanding_tasks > 0 {
127127
RequestStatus::Running
128128
} else {
@@ -170,13 +170,15 @@ pub struct Tasks {
170170
}
171171

172172
#[derive(Debug, Serialize, Deserialize, ToSchema, Clone)]
173+
#[serde(rename_all = "lowercase")]
173174
pub enum RequestStatus {
174175
Pending,
175176
Running,
176-
Finalized,
177+
Complete,
177178
}
178179

179180
#[derive(Debug, Serialize, Deserialize, ToSchema, Clone)]
181+
#[serde(rename_all = "lowercase")]
180182
pub enum RequestOutcome {
181183
Undefined,
182184
Success,
@@ -194,6 +196,7 @@ impl From<GraphInvocationOutcome> for RequestOutcome {
194196
}
195197

196198
#[derive(Debug, Serialize, Deserialize, ToSchema, Clone)]
199+
#[serde(rename_all = "lowercase")]
197200
pub enum RequestFailureReason {
198201
Unknown,
199202
InternalError,
@@ -257,7 +260,7 @@ impl Request {
257260
);
258261
}
259262
let status = if ctx.completed {
260-
RequestStatus::Finalized
263+
RequestStatus::Complete
261264
} else if ctx.outstanding_tasks > 0 {
262265
RequestStatus::Running
263266
} else {

server/src/routes/download.rs

Lines changed: 53 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,14 @@ use anyhow::anyhow;
22
use axum::{
33
body::Body,
44
extract::{Path, State},
5-
response::Response,
5+
response::{IntoResponse, NoContent, Response},
66
};
77
use futures::TryStreamExt;
88

99
use super::routes_state::RouteState;
1010
use crate::{
1111
blob_store::BlobStorage,
12-
data_model::GraphInvocationError,
12+
data_model::{DataPayload, GraphInvocationError},
1313
http_objects::{IndexifyAPIError, RequestError},
1414
};
1515

@@ -120,7 +120,7 @@ pub async fn download_fn_output_payload(
120120
.map_err(|e| IndexifyAPIError::internal_error_str(&e.to_string()))
121121
}
122122

123-
/// Get function output
123+
/// Get function output by index
124124
#[utoipa::path(
125125
get,
126126
path = "v1/namespaces/{namespace}/compute-graphs/{compute_graph}/requests/{request_id}/fn/{fn_name}/outputs/{id}/index/{index}",
@@ -159,14 +159,60 @@ pub async fn v1_download_fn_output_payload(
159159
let encoding = output.encoding.clone();
160160

161161
let payload = &output.payloads[index];
162-
let storage_reader = state
163-
.blob_storage
164-
.get_blob_store(&namespace)
162+
let blob_storage = state.blob_storage.get_blob_store(&namespace);
163+
stream_data_payload(&payload, &blob_storage, &encoding).await
164+
}
165+
166+
/// Get function output
167+
#[utoipa::path(
168+
get,
169+
path = "v1/namespaces/{namespace}/compute-graphs/{compute_graph}/requests/{request_id}/output/{fn_name}",
170+
tag = "retrieve",
171+
responses(
172+
(status = 200, description = "function output"),
173+
(status = INTERNAL_SERVER_ERROR, description = "internal server error")
174+
),
175+
)]
176+
pub async fn v1_download_fn_output_payload_simple(
177+
Path((namespace, compute_graph, invocation_id, fn_name)): Path<(
178+
String,
179+
String,
180+
String,
181+
String,
182+
)>,
183+
State(state): State<RouteState>,
184+
) -> Result<Response<Body>, IndexifyAPIError> {
185+
let output = state
186+
.indexify_state
187+
.reader()
188+
.fn_output_payload_first(&namespace, &compute_graph, &invocation_id, &fn_name)
189+
.map_err(|e| {
190+
IndexifyAPIError::internal_error(anyhow!(
191+
"failed to download invocation payload: {}",
192+
e
193+
))
194+
})?;
195+
let Some(output) = output else {
196+
return Ok(NoContent.into_response());
197+
};
198+
199+
let encoding = output.encoding.clone();
200+
201+
let payload = &output.payloads[0];
202+
let blob_storage = state.blob_storage.get_blob_store(&namespace);
203+
stream_data_payload(&payload, &blob_storage, &encoding).await
204+
}
205+
206+
async fn stream_data_payload(
207+
payload: &DataPayload,
208+
blob_storage: &BlobStorage,
209+
encoding: &str,
210+
) -> Result<Response<Body>, IndexifyAPIError> {
211+
let storage_reader = blob_storage
165212
.get(&payload.path)
166213
.await
167214
.map_err(IndexifyAPIError::internal_error)?;
168215

169-
// Check if the content type is JSON
170216
if encoding == "application/json" {
171217
let json_bytes = storage_reader
172218
.map_ok(|chunk| chunk.to_vec())

server/src/routes/invoke.rs

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ use axum::{
99
Json,
1010
};
1111
use futures::{Stream, StreamExt};
12-
use serde::Deserialize;
12+
use serde::{Deserialize, Serialize};
1313
use tokio::sync::broadcast::{error::RecvError, Receiver};
1414
use tracing::{error, info, warn};
1515
use uuid::Uuid;
@@ -134,6 +134,12 @@ async fn create_invocation_progress_stream(
134134
}
135135
}
136136

137+
#[derive(Serialize)]
138+
struct RequestIdV1 {
139+
// FIXME: Remove this once we migrate clients off this.
140+
id: String,
141+
request_id: String,
142+
}
137143
/// Make a request to a workflow
138144
#[utoipa::path(
139145
post,
@@ -244,8 +250,9 @@ pub async fn invoke_with_object_v1(
244250
})?;
245251

246252
if accept_header.contains("application/json") {
247-
return Ok(Json(RequestId {
248-
id: graph_invocation_ctx.invocation_id,
253+
return Ok(Json(RequestIdV1 {
254+
id: graph_invocation_ctx.invocation_id.clone(),
255+
request_id: graph_invocation_ctx.invocation_id.clone(),
249256
})
250257
.into_response());
251258
}

server/src/routes_v1.rs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ use crate::{
4646
http_objects_v1::{self, GraphRequests},
4747
routes::{
4848
compute_graphs::{self, create_or_update_compute_graph_v1},
49-
download::{self, v1_download_fn_output_payload},
49+
download::{self, v1_download_fn_output_payload, v1_download_fn_output_payload_simple},
5050
invoke::{self, progress_stream},
5151
routes_state::RouteState,
5252
},
@@ -163,10 +163,19 @@ fn v1_namespace_routes(route_state: RouteState) -> Router {
163163
"/compute-graphs/{compute_graph}/requests/{request_id}/tasks",
164164
get(list_tasks).with_state(route_state.clone()),
165165
)
166+
// FIXME: remove this route once we migrate tensorlake sdk to this
166167
.route(
167168
"/compute-graphs/{compute_graph}/requests/{request_id}/fn/{fn_name}/outputs/{id}/index/{index}",
168169
get(v1_download_fn_output_payload).with_state(route_state.clone()),
169170
)
171+
.route(
172+
"/compute-graphs/{compute_graph}/requests/{request_id}/output/{fn_name}/id/{id}/index/{index}",
173+
get(v1_download_fn_output_payload).with_state(route_state.clone()),
174+
)
175+
.route(
176+
"/compute-graphs/{compute_graph}/requests/{request_id}/output/{fn_name}",
177+
get(v1_download_fn_output_payload_simple).with_state(route_state.clone()),
178+
)
170179
.layer(middleware::from_fn(move |rpp, r, n| {
171180
namespace_middleware(route_state.clone(), rpp, r, n)
172181
}))

server/src/state_store/invocation_events.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,7 @@ pub struct TaskAssigned {
9595
}
9696

9797
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
98+
#[serde(rename_all = "lowercase")]
9899
pub enum TaskOutcomeSummary {
99100
Unknown,
100101
Success,

server/src/state_store/scanner.rs

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -714,6 +714,30 @@ impl StateReader {
714714
None => Ok(None),
715715
}
716716
}
717+
718+
pub fn fn_output_payload_first(
719+
&self,
720+
namespace: &str,
721+
compute_graph: &str,
722+
invocation_id: &str,
723+
compute_fn: &str,
724+
) -> Result<Option<NodeOutput>> {
725+
let kvs = &[KeyValue::new("op", "fn_output_payload_first")];
726+
let _timer = Timer::start_with_labels(&self.metrics.state_read, kvs);
727+
728+
let key = format!(
729+
"{}|{}|{}|{}",
730+
namespace, compute_graph, invocation_id, compute_fn
731+
);
732+
733+
let (node_outputs, _) = self.get_rows_from_cf_with_limits::<NodeOutput>(
734+
key.as_bytes(),
735+
None,
736+
IndexifyObjectsColumns::FnOutputs,
737+
Some(1),
738+
)?;
739+
Ok(node_outputs.first().cloned())
740+
}
717741
}
718742

719743
#[cfg(test)]

0 commit comments

Comments
 (0)