Skip to content

Commit 47dab3b

Browse files
GuanLuormccorm4tanmayv25
authored
chore: cherry-pick KServe readiness (#4708) and metrics (#4400) feature commits (#4730)
Signed-off-by: Guan Luo <gluo@nvidia.com> Signed-off-by: GuanLuo <41310872+GuanLuo@users.noreply.github.com> Co-authored-by: Ryan McCormick <rmccormick@nvidia.com> Co-authored-by: Tanmay Verma <tanmayv@nvidia.com>
1 parent 0aa88a3 commit 47dab3b

File tree

7 files changed

+516
-61
lines changed

7 files changed

+516
-61
lines changed

lib/llm/src/entrypoint/input/grpc.rs

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -152,9 +152,18 @@ pub async fn run(
152152
grpc_service
153153
}
154154
};
155-
grpc_service
156-
.run(distributed_runtime.primary_token())
157-
.await?;
155+
156+
// Run both HTTP (for metrics) and gRPC servers concurrently
157+
let http_service = grpc_service.http_service().clone();
158+
let shutdown_token = distributed_runtime.primary_token();
159+
160+
// Wait for both servers to complete, propagating the first error if any occurs
161+
// Both tasks should run indefinitely until cancelled by the shutdown token
162+
tokio::try_join!(
163+
grpc_service.run(shutdown_token.clone()),
164+
http_service.run(shutdown_token)
165+
)?;
166+
158167
distributed_runtime.shutdown(); // Cancel primary token
159168
Ok(())
160169
}

lib/llm/src/grpc/protos/kserve.proto

Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,27 @@ import "model_config.proto";
1616
//@@
1717
service GRPCInferenceService
1818
{
19+
//@@ .. cpp:var:: rpc ServerLive(ServerLiveRequest) returns
20+
//@@ (ServerLiveResponse)
21+
//@@
22+
//@@ Check liveness of the inference server.
23+
//@@
24+
rpc ServerLive(ServerLiveRequest) returns (ServerLiveResponse) {}
25+
26+
//@@ .. cpp:var:: rpc ServerReady(ServerReadyRequest) returns
27+
//@@ (ServerReadyResponse)
28+
//@@
29+
//@@ Check readiness of the inference server.
30+
//@@
31+
rpc ServerReady(ServerReadyRequest) returns (ServerReadyResponse) {}
32+
33+
//@@ .. cpp:var:: rpc ModelReady(ModelReadyRequest) returns
34+
//@@ (ModelReadyResponse)
35+
//@@
36+
//@@ Check readiness of a model in the inference server.
37+
//@@
38+
rpc ModelReady(ModelReadyRequest) returns (ModelReadyResponse) {}
39+
1940
//@@ .. cpp:var:: rpc ModelMetadata(ModelMetadataRequest) returns
2041
//@@ (ModelMetadataResponse)
2142
//@@
@@ -45,6 +66,89 @@ service GRPCInferenceService
4566
rpc ModelConfig(ModelConfigRequest) returns (ModelConfigResponse) {}
4667
}
4768

69+
//@@
70+
//@@.. cpp:var:: message ServerLiveRequest
71+
//@@
72+
//@@ Request message for ServerLive.
73+
//@@
74+
message ServerLiveRequest {}
75+
76+
//@@
77+
//@@.. cpp:var:: message ServerLiveResponse
78+
//@@
79+
//@@ Response message for ServerLive.
80+
//@@
81+
message ServerLiveResponse
82+
{
83+
//@@
84+
//@@ .. cpp:var:: bool live
85+
//@@
86+
//@@ True if the inference server is live, false if not live.
87+
//@@
88+
bool live = 1;
89+
}
90+
91+
//@@
92+
//@@.. cpp:var:: message ServerReadyRequest
93+
//@@
94+
//@@ Request message for ServerReady.
95+
//@@
96+
message ServerReadyRequest {}
97+
98+
//@@
99+
//@@.. cpp:var:: message ServerReadyResponse
100+
//@@
101+
//@@ Response message for ServerReady.
102+
//@@
103+
message ServerReadyResponse
104+
{
105+
//@@
106+
//@@ .. cpp:var:: bool ready
107+
//@@
108+
//@@ True if the inference server is ready, false if not ready. The server
109+
//@@ is considered ready if it has any registered models, since models
110+
//@@ can freely be registered and unregistered at runtime.
111+
//@@
112+
bool ready = 1;
113+
}
114+
115+
//@@
116+
//@@.. cpp:var:: message ModelReadyRequest
117+
//@@
118+
//@@ Request message for ModelReady.
119+
//@@
120+
message ModelReadyRequest
121+
{
122+
//@@
123+
//@@ .. cpp:var:: string name
124+
//@@
125+
//@@ The name of the model to check for readiness.
126+
//@@
127+
string name = 1;
128+
129+
//@@ .. cpp:var:: string version
130+
//@@
131+
//@@ The version of the model to check for readiness. If not given the
132+
//@@ server will choose a version based on the model and internal policy.
133+
//@@
134+
string version = 2;
135+
}
136+
137+
//@@
138+
//@@.. cpp:var:: message ModelReadyResponse
139+
//@@
140+
//@@ Response message for ModelReady.
141+
//@@
142+
message ModelReadyResponse
143+
{
144+
//@@
145+
//@@ .. cpp:var:: bool ready
146+
//@@
147+
//@@ True if the model is ready, false if not ready.
148+
//@@
149+
bool ready = 1;
150+
}
151+
48152
//@@
49153
//@@.. cpp:var:: message ModelMetadataRequest
50154
//@@

lib/llm/src/grpc/service/kserve.rs

Lines changed: 100 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use crate::grpc::service::kserve::inference::DataType;
88
use crate::grpc::service::kserve::inference::ModelInput;
99
use crate::grpc::service::kserve::inference::ModelOutput;
1010
use crate::http::service::Metrics;
11-
use crate::http::service::metrics;
11+
use crate::http::service::service_v2 as http_service;
1212

1313
use crate::discovery::ModelManager;
1414
use crate::local_model::runtime_config::ModelRuntimeConfig;
@@ -42,20 +42,29 @@ use inference::{
4242

4343
use prost::Message;
4444

45-
/// [gluo TODO] 'metrics' are for HTTP service and there is HTTP endpoint
46-
/// for it as part of HTTP service. Should we always start HTTP service up
47-
/// for non-inference?
45+
/// gRPC service state - shares metrics with HTTP service for unified metrics collection
4846
pub struct State {
4947
metrics: Arc<Metrics>,
5048
manager: Arc<ModelManager>,
5149
}
5250

51+
#[derive(Default, Builder)]
52+
#[builder(
53+
pattern = "owned",
54+
build_fn(private, name = "build_internal"),
55+
name = "StateBuilder",
56+
vis = "pub"
57+
)]
58+
pub(crate) struct StateConfig {
59+
#[builder(default, setter(strip_option))]
60+
metrics: Option<Arc<Metrics>>,
61+
#[builder(default, setter(strip_option))]
62+
manager: Option<Arc<ModelManager>>,
63+
}
64+
5365
impl State {
54-
pub fn new(manager: Arc<ModelManager>) -> Self {
55-
Self {
56-
manager,
57-
metrics: Arc::new(Metrics::default()),
58-
}
66+
pub fn builder() -> StateBuilder {
67+
StateBuilder::default()
5968
}
6069

6170
/// Get the Prometheus [`Metrics`] object which tracks request counts and inflight requests
@@ -76,11 +85,29 @@ impl State {
7685
}
7786
}
7887

88+
impl StateBuilder {
89+
pub fn build(self) -> Result<State, anyhow::Error> {
90+
let config = self.build_internal()?;
91+
92+
Ok(State {
93+
manager: config
94+
.manager
95+
.unwrap_or_else(|| Arc::new(ModelManager::new())),
96+
metrics: config
97+
.metrics
98+
.unwrap_or_else(|| Arc::new(Metrics::default())),
99+
})
100+
}
101+
}
102+
79103
#[derive(Clone)]
80104
pub struct KserveService {
81105
// The state we share with every request handler
82106
state: Arc<State>,
83107

108+
// HTTP service for metrics endpoint
109+
http_service: http_service::HttpService,
110+
84111
port: u16,
85112
host: String,
86113
request_template: Option<RequestTemplate>,
@@ -97,6 +124,12 @@ pub struct KserveServiceConfig {
97124

98125
#[builder(default = "None")]
99126
request_template: Option<RequestTemplate>,
127+
128+
#[builder(default = "8788")]
129+
http_metrics_port: u16,
130+
131+
#[builder(setter(into), default = "String::from(\"0.0.0.0\")")]
132+
http_metrics_host: String,
100133
}
101134

102135
impl KserveService {
@@ -116,6 +149,10 @@ impl KserveService {
116149
self.state().manager()
117150
}
118151

152+
pub fn http_service(&self) -> &http_service::HttpService {
153+
&self.http_service
154+
}
155+
119156
pub async fn spawn(&self, cancel_token: CancellationToken) -> JoinHandle<Result<()>> {
120157
let this = self.clone();
121158
tokio::spawn(async move { this.run(cancel_token).await })
@@ -140,15 +177,29 @@ impl KserveServiceConfigBuilder {
140177
pub fn build(self) -> Result<KserveService, anyhow::Error> {
141178
let config: KserveServiceConfig = self.build_internal()?;
142179

143-
let model_manager = Arc::new(ModelManager::new());
144-
let state = Arc::new(State::new(model_manager));
145-
146-
// enable prometheus metrics
147-
let registry = metrics::Registry::new();
148-
state.metrics_clone().register(&registry)?;
180+
// Create HTTP service with only non-inference endpoints (metrics, health, models list)
181+
// This provides the metrics endpoint and shared metrics object
182+
let http_service = http_service::HttpService::builder()
183+
.port(config.http_metrics_port)
184+
.host(config.http_metrics_host.clone())
185+
// Disable all inference endpoints - only use for metrics/health
186+
.enable_chat_endpoints(false)
187+
.enable_cmpl_endpoints(false)
188+
.enable_embeddings_endpoints(false)
189+
.enable_responses_endpoints(false)
190+
.build()?;
191+
192+
// Share the HTTP service's model manager and metrics object with gRPC state
193+
let state = Arc::new(
194+
State::builder()
195+
.manager(http_service.state().manager_clone())
196+
.metrics(http_service.state().metrics_clone())
197+
.build()?,
198+
);
149199

150200
Ok(KserveService {
151201
state,
202+
http_service,
152203
port: config.port,
153204
host: config.host,
154205
request_template: config.request_template,
@@ -624,4 +675,38 @@ impl GrpcInferenceService for KserveService {
624675
request_model_name
625676
)))
626677
}
678+
679+
async fn server_live(
680+
&self,
681+
_request: Request<inference::ServerLiveRequest>,
682+
) -> Result<Response<inference::ServerLiveResponse>, Status> {
683+
// server is live if we can respond
684+
Ok(Response::new(inference::ServerLiveResponse { live: true }))
685+
}
686+
687+
async fn server_ready(
688+
&self,
689+
_request: Request<inference::ServerReadyRequest>,
690+
) -> Result<Response<inference::ServerReadyResponse>, Status> {
691+
let has_models = !self.state.manager().get_model_cards().is_empty();
692+
Ok(Response::new(inference::ServerReadyResponse {
693+
ready: has_models,
694+
}))
695+
}
696+
697+
async fn model_ready(
698+
&self,
699+
request: Request<inference::ModelReadyRequest>,
700+
) -> Result<Response<inference::ModelReadyResponse>, Status> {
701+
let request_model_name = &request.into_inner().name;
702+
let is_ready = self
703+
.state
704+
.manager()
705+
.get_model_cards()
706+
.into_iter()
707+
.any(|card| request_model_name == &card.display_name);
708+
Ok(Response::new(inference::ModelReadyResponse {
709+
ready: is_ready,
710+
}))
711+
}
627712
}

lib/llm/src/grpc/service/tensor.rs

Lines changed: 12 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,11 @@ use crate::types::Annotated;
1515
use super::kserve;
1616

1717
// [gluo NOTE] These are common utilities that should be shared between frontends
18+
use crate::http::service::metrics::InflightGuard;
1819
use crate::http::service::{
1920
disconnect::{ConnectionHandle, create_connection_monitor},
20-
metrics::{Endpoint, ResponseMetricCollector},
21+
metrics::{Endpoint, process_response_and_observe_metrics},
2122
};
22-
use crate::{http::service::metrics::InflightGuard, preprocessor::LLMMetricAnnotation};
2323

2424
use crate::protocols::tensor;
2525
use crate::protocols::tensor::{
@@ -76,6 +76,8 @@ pub async fn tensor_response_stream(
7676
.get_tensor_engine(model)
7777
.map_err(|_| Status::not_found("model not found"))?;
7878

79+
let http_queue_guard = state.metrics_clone().create_http_queue_guard(model);
80+
7981
let inflight_guard =
8082
state
8183
.metrics_clone()
@@ -115,9 +117,15 @@ pub async fn tensor_response_stream(
115117
// apply any annotations to the front of the stream
116118
let stream = stream::iter(annotations).chain(stream);
117119

118-
// Tap on the stream to collect response metrics
120+
// Tap on the stream to collect response metrics and handle http_queue_guard
121+
let mut http_queue_guard = Some(http_queue_guard);
119122
let stream = stream.inspect(move |response| {
120-
process_metrics_only(response, &mut response_collector);
123+
// Calls observe_response() on each token - drops http_queue_guard on first token
124+
process_response_and_observe_metrics(
125+
response,
126+
&mut response_collector,
127+
&mut http_queue_guard,
128+
);
121129
});
122130

123131
let stream = grpc_monitor_for_disconnects(stream, ctx, inflight_guard, stream_handle);
@@ -170,17 +178,6 @@ pub fn grpc_monitor_for_disconnects<T>(
170178
}
171179
}
172180

173-
fn process_metrics_only<T>(
174-
annotated: &Annotated<T>,
175-
response_collector: &mut ResponseMetricCollector,
176-
) {
177-
// update metrics
178-
if let Ok(Some(metrics)) = LLMMetricAnnotation::from_annotation(annotated) {
179-
response_collector.observe_current_osl(metrics.output_tokens);
180-
response_collector.observe_response(metrics.input_tokens, metrics.chunk_tokens);
181-
}
182-
}
183-
184181
/// Get the request ID from a primary source, or lastly create a new one if not present
185182
fn get_or_create_request_id(primary: Option<&str>) -> String {
186183
// Try to get the request ID from the primary source

0 commit comments

Comments
 (0)