Skip to content

Commit 424d2da

Browse files
committed
refactor: remove legacy NATS metrics reporting infrastructure
Remove all NATS metrics collection and reporting code from the runtime: - Remove NATS metric constants (nats_client, nats_service modules) - Remove ComponentNatsServerPrometheusMetrics struct and impl - Remove DRTNatsClientPrometheusMetrics struct and impl - Remove nats_metrics_worker background task - Remove NATS metrics tests - Remove NATS metrics helper functions - Remove stats_handler infrastructure (EndpointStatsHandler, service.rs) - Update tests to not filter NATS metrics - Update runtime examples to remove stats_handler usage - Clarify distributed_runtime.md documentation This removes ~1200 lines of code while maintaining all passing tests. The system no longer collects or reports NATS client/service metrics. Fix assert_eq! argument order in metrics tests The assert_eq! macro arguments were swapped relative to the error message template. Fixed by swapping 'Expected' and 'Actual' labels in the error message to match the actual order of arguments (actual, expected). Signed-off-by: Keiven Chang <keivenchang@users.noreply.github.com>
1 parent 7e499b5 commit 424d2da

File tree

15 files changed

+31
-1214
lines changed

15 files changed

+31
-1214
lines changed

docs/design_docs/distributed_runtime.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ The hierarchy and naming in etcd and NATS may change over time, and this documen
5353

5454
For etcd, it also creates a primary lease and spin up a background task to keep the lease alive. All objects registered under this `DistributedRuntime` use this lease_id to maintain their life cycle. There is also a cancellation token that is tied to the primary lease. When the cancellation token is triggered or the background task failed, the primary lease is revoked or expired and the kv pairs stored with this lease_id is removed.
5555
- `Namespace`: `Namespace`s are primarily a logical grouping mechanism and is not registered in etcd. It provides the root path for all components under this `Namespace`.
56-
- `Component`: When a `Component` object is created, similar to `Namespace`, it isn't be registered in etcd. When `create_service` is called, it creates a NATS service group using `{namespace_name}.{service_name}` for metrics and registers a service in the registry of the `Component`, where the registry is an internal data structure that tracks all services and endpoints within the `DistributedRuntime`.
56+
- `Component`: When a `Component` object is created, similar to `Namespace`, it isn't be registered in etcd. When `create_service` is called, it creates a NATS service group using `{namespace_name}.{service_name}` as the service identifier and registers a service in the registry of the `Component`, where the registry is an internal data structure that tracks all services and endpoints within the `DistributedRuntime`.
5757
- `Endpoint`: When an Endpoint object is created and started, it performs two key registrations:
5858
- NATS Registration: The endpoint is registered with the NATS service group created during service creation. The endpoint is assigned a unique subject following the naming: `{namespace_name}.{service_name}.{endpoint_name}-{lease_id_hex}`.
5959
- etcd Registration: The endpoint information is stored in etcd at a path following the naming: `/services/{namespace}/{component}/{endpoint}-{lease_id}`. Note that the endpoints of different workers of the same type (i.e., two `VllmPrefillWorker`s in one deployment) share the same `Namespace`, `Component`, and `Endpoint` name. They are distinguished by their different primary `lease_id` of their `DistributedRuntime`.

lib/runtime/examples/service_metrics/README.md

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -27,13 +27,7 @@ Annotated { data: Some("o"), id: None, event: None, comment: None }
2727
Annotated { data: Some("r"), id: None, event: None, comment: None }
2828
Annotated { data: Some("l"), id: None, event: None, comment: None }
2929
Annotated { data: Some("d"), id: None, event: None, comment: None }
30-
ServiceSet { services: [ServiceInfo { name: "dynamo_init_backend_720278f8", id: "eOHMc4ndRw8s5flv4WOZx7", version: "0.0.1", started: "2025-02-26T18:54:04.917294605Z", endpoints: [EndpointInfo { name: "dynamo_init_backend_720278f8-generate-694d951a80e06abf", subject: "dynamo_init_backend_720278f8.generate-694d951a80e06abf", data: Some(Metrics(Object {"average_processing_time": Number(53662), "data": Object {"val": Number(10)}, "last_error": String(""), "num_errors": Number(0), "num_requests": Number(2), "processing_time": Number(107325), "queue_group": String("q")})) }] }] }
31-
```
32-
33-
Note the following stats in the output demonstrate the custom
34-
`stats_handler` attached to the service in `server.rs` is being invoked:
35-
```
36-
data: Some(Metrics(Object {..., "data": Object {"val": Number(10)}, ...)
30+
ServiceSet { services: [ServiceInfo { name: "dynamo_init_backend_720278f8", id: "eOHMc4ndRw8s5flv4WOZx7", version: "0.0.1", started: "2025-02-26T18:54:04.917294605Z", endpoints: [EndpointInfo { name: "dynamo_init_backend_720278f8-generate-694d951a80e06abf", subject: "dynamo_init_backend_720278f8.generate-694d951a80e06abf", data: Some(Metrics(Object {"average_processing_time": Number(53662), "last_error": String(""), "num_errors": Number(0), "num_requests": Number(2), "processing_time": Number(107325), "queue_group": String("q")})) }] }] }
3731
```
3832

3933
If you start two copies of the server, you will see two entries being emitted.

lib/runtime/examples/service_metrics/src/bin/service_server.rs

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
22
// SPDX-License-Identifier: Apache-2.0
33

4-
use service_metrics::{DEFAULT_NAMESPACE, MyStats};
4+
use service_metrics::DEFAULT_NAMESPACE;
55

66
use dynamo_runtime::{
77
DistributedRuntime, Runtime, Worker, logging,
@@ -63,11 +63,6 @@ async fn backend(runtime: DistributedRuntime) -> anyhow::Result<()> {
6363
component
6464
.endpoint("generate")
6565
.endpoint_builder()
66-
.stats_handler(|stats| {
67-
println!("stats: {:?}", stats);
68-
let stats = MyStats { val: 10 };
69-
serde_json::to_value(stats).unwrap()
70-
})
7166
.handler(ingress)
7267
.start()
7368
.await
Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,4 @@
11
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
22
// SPDX-License-Identifier: Apache-2.0
33

4-
use serde::{Deserialize, Serialize};
5-
64
pub const DEFAULT_NAMESPACE: &str = "dynamo";
7-
8-
#[derive(Serialize, Deserialize)]
9-
// Dummy Stats object to demonstrate how to attach a custom stats handler
10-
pub struct MyStats {
11-
pub val: u32,
12-
}

lib/runtime/examples/system_metrics/src/lib.rs

Lines changed: 1 addition & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,6 @@ pub const DEFAULT_NAMESPACE: &str = "dyn_example_namespace";
1818
pub const DEFAULT_COMPONENT: &str = "dyn_example_component";
1919
pub const DEFAULT_ENDPOINT: &str = "dyn_example_endpoint";
2020

21-
/// Stats structure returned by the endpoint's stats handler
22-
#[derive(serde::Serialize, serde::Deserialize, Debug, Clone)]
23-
pub struct MyStats {
24-
// Example value for demonstration purposes
25-
pub val: i32,
26-
}
27-
2821
/// Custom metrics for system stats with data bytes tracking
2922
#[derive(Clone, Debug)]
3023
pub struct MySystemStatsMetrics {
@@ -103,17 +96,7 @@ pub async fn backend(drt: DistributedRuntime, endpoint_name: Option<&str>) -> an
10396
// Use the factory pattern - single line factory call with metrics
10497
let ingress = Ingress::for_engine(RequestHandler::with_metrics(system_metrics))?;
10598

106-
endpoint
107-
.endpoint_builder()
108-
.stats_handler(|_stats| {
109-
println!("Stats handler called with stats: {:?}", _stats);
110-
// TODO(keivenc): return a real stats object
111-
let stats = MyStats { val: 10 };
112-
serde_json::to_value(stats).unwrap()
113-
})
114-
.handler(ingress)
115-
.start()
116-
.await?;
99+
endpoint.endpoint_builder().handler(ingress).start().await?;
117100

118101
Ok(())
119102
}

lib/runtime/src/component.rs

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,6 @@ use super::{DistributedRuntime, Runtime, traits::*, transports::nats::Slug, util
4343

4444
use crate::pipeline::network::{PushWorkHandler, ingress::push_endpoint::PushEndpoint};
4545
use crate::protocols::EndpointId;
46-
use crate::service::ComponentNatsServerPrometheusMetrics;
4746
use async_nats::{
4847
rustls::quic,
4948
service::{Service, ServiceExt},
@@ -52,7 +51,6 @@ use derive_builder::Builder;
5251
use derive_getters::Getters;
5352
use educe::Educe;
5453
use serde::{Deserialize, Serialize};
55-
use service::EndpointStatsHandler;
5654
use std::{collections::HashMap, hash::Hash, sync::Arc};
5755
use validator::{Validate, ValidationError};
5856

@@ -62,7 +60,6 @@ mod component;
6260
mod endpoint;
6361
mod namespace;
6462
mod registry;
65-
pub mod service;
6663

6764
pub use client::Client;
6865
pub use endpoint::build_transport_type;
@@ -79,8 +76,6 @@ pub enum TransportType {
7976
#[derive(Default)]
8077
pub struct RegistryInner {
8178
pub(crate) services: HashMap<String, Service>,
82-
pub(crate) stats_handlers:
83-
HashMap<String, Arc<parking_lot::Mutex<HashMap<String, EndpointStatsHandler>>>>,
8479
}
8580

8681
#[derive(Clone)]
@@ -279,11 +274,6 @@ impl ComponentBuilder {
279274

280275
pub fn build(self) -> Result<Component, anyhow::Error> {
281276
let component = self.build_internal()?;
282-
// If this component is using NATS, gather it's metrics
283-
let drt = component.drt();
284-
if drt.request_plane().is_nats() {
285-
drt.start_stats_service(component.clone());
286-
}
287277
Ok(component)
288278
}
289279
}

lib/runtime/src/component/endpoint.rs

Lines changed: 3 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,13 @@
44
use std::sync::Arc;
55

66
use anyhow::Result;
7-
pub use async_nats::service::endpoint::Stats as EndpointStats;
87
use derive_builder::Builder;
98
use derive_getters::Dissolve;
109
use educe::Educe;
1110
use tokio_util::sync::CancellationToken;
1211

1312
use crate::{
14-
component::{Endpoint, Instance, TransportType, service::EndpointStatsHandler},
13+
component::{Endpoint, Instance, TransportType},
1514
distributed::RequestPlaneMode,
1615
pipeline::network::{PushWorkHandler, ingress::push_endpoint::PushEndpoint},
1716
protocols::EndpointId,
@@ -30,11 +29,6 @@ pub struct EndpointConfig {
3029
#[educe(Debug(ignore))]
3130
handler: Arc<dyn PushWorkHandler>,
3231

33-
/// Stats handler
34-
#[educe(Debug(ignore))]
35-
#[builder(default, private)]
36-
_stats_handler: Option<EndpointStatsHandler>,
37-
3832
/// Additional labels for metrics
3933
#[builder(default, setter(into))]
4034
metrics_labels: Option<Vec<(String, String)>>,
@@ -56,13 +50,6 @@ impl EndpointConfigBuilder {
5650
Self::default().endpoint(endpoint)
5751
}
5852

59-
pub fn stats_handler<F>(self, handler: F) -> Self
60-
where
61-
F: FnMut(EndpointStats) -> serde_json::Value + Send + Sync + 'static,
62-
{
63-
self._stats_handler(Some(Box::new(handler)))
64-
}
65-
6653
/// Register an async engine in the local endpoint registry for direct in-process calls
6754
pub fn register_local_engine(
6855
self,
@@ -80,46 +67,19 @@ impl EndpointConfigBuilder {
8067
}
8168

8269
pub async fn start(self) -> Result<()> {
83-
let (
84-
endpoint,
85-
handler,
86-
stats_handler,
87-
metrics_labels,
88-
graceful_shutdown,
89-
health_check_payload,
90-
) = self.build_internal()?.dissolve();
70+
let (endpoint, handler, metrics_labels, graceful_shutdown, health_check_payload) =
71+
self.build_internal()?.dissolve();
9172
let connection_id = endpoint.drt().connection_id();
9273
let endpoint_id = endpoint.id();
9374

9475
tracing::debug!("Starting endpoint: {endpoint_id}");
9576

96-
let service_name = endpoint.component.service_name();
97-
9877
let metrics_labels: Option<Vec<(&str, &str)>> = metrics_labels
9978
.as_ref()
10079
.map(|v| v.iter().map(|(k, v)| (k.as_str(), v.as_str())).collect());
10180
// Add metrics to the handler. The endpoint provides additional information to the handler.
10281
handler.add_metrics(&endpoint, metrics_labels.as_deref())?;
10382

104-
// Insert the stats handler. depends on NATS.
105-
if let Some(stats_handler) = stats_handler {
106-
let registry = endpoint.drt().component_registry().inner.lock().await;
107-
let handler_map = registry
108-
.stats_handlers
109-
.get(&service_name)
110-
.cloned()
111-
.expect("no stats handler registry; this is unexpected");
112-
// There is something wrong with the stats handler map I think.
113-
// Here the connection_id is included, but in component/service.rs add_stats_service it uses service_name,
114-
// no connection id so it's per-endpoint not per-instance. Doesn't match.
115-
// To not block current refactor I am keeping previous behavior, but I think needs
116-
// investigation.
117-
handler_map.lock().insert(
118-
nats::instance_subject(&endpoint_id, connection_id),
119-
stats_handler,
120-
);
121-
}
122-
12383
// This creates a child token of the runtime's endpoint_shutdown_token. That token is
12484
// cancelled first as part of graceful shutdown. See Runtime::shutdown.
12585
let endpoint_shutdown_token = endpoint.drt().child_token();

lib/runtime/src/component/service.rs

Lines changed: 0 additions & 60 deletions
This file was deleted.

0 commit comments

Comments
 (0)