Skip to content

Commit 7281dd9

Browse files
craig[bot]Nukitt
andcommitted
Merge #153651
153651: rpc, server: add DRPC metrics server interceptor r=cthumuluru-crdb a=Nukitt Previously, there was no support for metrics interceptor in DRPC, and capturing drpc metrics wouldn't be possible. In this PR, we add metrics server interceptor support in DRPC and reuse the `rpc_server_request_duration_nanos` metric for both gRPC and DRPC since the metric reflects server-side duration agnostic of transport and interceptor chains are not expected to differ materially in practice. When differentiation is needed, it can be correlated with other metadata. This way we can observe both gRPC and DRPC latency through interceptors. Since this metric is behind a cluster setting, we also had to change the key of the metric to `server.rpc.request_metrics.enabled` so that it can be related to both the rpc framework and set an alias as its retired name `server.grpc.request_metrics.enabled`. Epic: CRDB-49359 Fixes: #144373 Release note: None Co-authored-by: Nukitt <nukit.tailor@cockroachlabs.com>
2 parents 2e2f525 + c561d21 commit 7281dd9

File tree

11 files changed

+116
-51
lines changed

11 files changed

+116
-51
lines changed

docs/generated/metrics/metrics.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7198,7 +7198,7 @@ layers:
71987198
derivative: NONE
71997199
- name: rpc.server.request.duration.nanos
72007200
exported_name: rpc_server_request_duration_nanos
7201-
description: Duration of an grpc request in nanoseconds.
7201+
description: Duration of an RPC request in nanoseconds.
72027202
y_axis_label: Duration
72037203
type: HISTOGRAM
72047204
unit: NANOSECONDS

pkg/rpc/drpc.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -222,6 +222,12 @@ func NewDRPCServer(_ context.Context, rpcCtx *Context, opts ...ServerOption) (DR
222222
// Recover from any uncaught panics caused by DB Console requests.
223223
unaryInterceptors = append(unaryInterceptors, DRPCGatewayRequestRecoveryInterceptor)
224224

225+
// If the metrics interceptor is set, it should be registered second so
226+
// that all other interceptors are included in the response time durations.
227+
if o.drpcRequestMetricsInterceptor != nil {
228+
unaryInterceptors = append(unaryInterceptors, drpcmux.UnaryServerInterceptor(o.drpcRequestMetricsInterceptor))
229+
}
230+
225231
if !rpcCtx.ContextOptions.Insecure {
226232
a := kvAuth{
227233
sv: &rpcCtx.Settings.SV,

pkg/rpc/metrics.go

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -207,7 +207,7 @@ over this connection.
207207
}
208208
metaRequestDuration = metric.Metadata{
209209
Name: "rpc.server.request.duration.nanos",
210-
Help: "Duration of an grpc request in nanoseconds.",
210+
Help: "Duration of an RPC request in nanoseconds.",
211211
Measurement: "Duration",
212212
Unit: metric.Unit_NANOSECONDS,
213213
MetricType: prometheusgo.MetricType_HISTOGRAM,
@@ -449,6 +449,7 @@ func NewRequestMetrics() *RequestMetrics {
449449
}
450450

451451
type RequestMetricsInterceptor grpc.UnaryServerInterceptor
452+
type DRPCRequestMetricsInterceptor drpcmux.UnaryServerInterceptor
452453

453454
// NewRequestMetricsInterceptor creates a new gRPC server interceptor that records
454455
// the duration of each RPC. The metric is labeled by the method name and the
@@ -485,6 +486,41 @@ func NewRequestMetricsInterceptor(
485486
}
486487
}
487488

489+
// NewDRPCRequestMetricsInterceptor creates a new DRPC server interceptor that records
490+
// the duration of each RPC. The metric is labeled by the method name and the
491+
// status code of the RPC. The interceptor will only record durations if
492+
// shouldRecord returns true. Otherwise, this interceptor will be a no-op.
493+
func NewDRPCRequestMetricsInterceptor(
494+
requestMetrics *RequestMetrics, shouldRecord func(rpc string) bool,
495+
) DRPCRequestMetricsInterceptor {
496+
return func(
497+
ctx context.Context,
498+
req any,
499+
rpc string,
500+
handler drpcmux.UnaryHandler,
501+
) (any, error) {
502+
if !shouldRecord(rpc) {
503+
return handler(ctx, req)
504+
}
505+
startTime := timeutil.Now()
506+
resp, err := handler(ctx, req)
507+
duration := timeutil.Since(startTime)
508+
var code codes.Code
509+
if err != nil {
510+
// TODO(server): use drpc status code
511+
code = status.Code(err)
512+
} else {
513+
code = codes.OK
514+
}
515+
516+
requestMetrics.Duration.Observe(map[string]string{
517+
RpcMethodLabel: rpc,
518+
RpcStatusCodeLabel: code.String(),
519+
}, float64(duration.Nanoseconds()))
520+
return resp, err
521+
}
522+
}
523+
488524
// MarkGatewayRequest returns a grpc metadata object that contains the
489525
// gwRequestKey field. This is used by the gRPC gateway that forwards HTTP
490526
// requests to their respective gRPC handlers. See gatewayRequestRecoveryInterceptor below.

pkg/rpc/metrics_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,7 @@ func TestServerRequestInstrumentInterceptor(t *testing.T) {
134134
if tc.shouldRecord {
135135
expectedCount = 1
136136
}
137-
assertGrpcMetrics(t, requestMetrics.Duration.ToPrometheusMetrics(), map[string]uint64{
137+
assertRpcMetrics(t, requestMetrics.Duration.ToPrometheusMetrics(), map[string]uint64{
138138
fmt.Sprintf("%s %s", tc.methodName, tc.statusCode): expectedCount,
139139
})
140140
})
@@ -209,7 +209,7 @@ func TestGatewayRequestRecoveryInterceptor(t *testing.T) {
209209
})
210210
}
211211

212-
func assertGrpcMetrics(
212+
func assertRpcMetrics(
213213
t *testing.T, metrics []*io_prometheus_client.Metric, expected map[string]uint64,
214214
) {
215215
t.Helper()

pkg/rpc/settings.go

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -122,8 +122,9 @@ var sourceAddr = func() net.Addr {
122122
}()
123123

124124
type serverOpts struct {
125-
interceptor func(fullMethod string) error
126-
metricsInterceptor RequestMetricsInterceptor
125+
interceptor func(fullMethod string) error
126+
metricsInterceptor RequestMetricsInterceptor
127+
drpcRequestMetricsInterceptor DRPCRequestMetricsInterceptor
127128
}
128129

129130
// ServerOption is a configuration option passed to NewServer.
@@ -153,3 +154,10 @@ func WithMetricsServerInterceptor(interceptor RequestMetricsInterceptor) ServerO
153154
opts.metricsInterceptor = interceptor
154155
}
155156
}
157+
158+
// WithDRPCMetricsServerInterceptor adds a DRPCRequestMetricsInterceptor to the DRPC server.
159+
func WithDRPCMetricsServerInterceptor(interceptor DRPCRequestMetricsInterceptor) ServerOption {
160+
return func(opts *serverOpts) {
161+
opts.drpcRequestMetricsInterceptor = interceptor
162+
}
163+
}

pkg/server/drpc_server.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,9 @@ type drpcServer struct {
3030

3131
// newDRPCServer creates and configures a new drpcServer instance. It enables
3232
// DRPC if the experimental setting is on, otherwise returns a dummy server.
33-
func newDRPCServer(ctx context.Context, rpcCtx *rpc.Context) (*drpcServer, error) {
33+
func newDRPCServer(
34+
ctx context.Context, rpcCtx *rpc.Context, requestMetrics *rpc.RequestMetrics,
35+
) (*drpcServer, error) {
3436
drpcServer := &drpcServer{}
3537
drpcServer.setMode(modeInitializing)
3638

@@ -41,7 +43,11 @@ func newDRPCServer(ctx context.Context, rpcCtx *rpc.Context) (*drpcServer, error
4143
func(path string) error {
4244
return drpcServer.intercept(path)
4345
}),
44-
)
46+
rpc.WithDRPCMetricsServerInterceptor(
47+
rpc.NewDRPCRequestMetricsInterceptor(requestMetrics, func(method string) bool {
48+
return shouldRecordRequestDuration(rpcCtx.Settings, method)
49+
}),
50+
))
4551
if err != nil {
4652
return nil, err
4753
}

pkg/server/grpc_server.go

Lines changed: 1 addition & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,9 @@ package server
77

88
import (
99
"context"
10-
"strings"
1110

1211
"github.com/cockroachdb/cockroach/pkg/rpc"
1312
"github.com/cockroachdb/cockroach/pkg/server/srverrors"
14-
"github.com/cockroachdb/cockroach/pkg/settings"
15-
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
16-
"github.com/cockroachdb/cockroach/pkg/util/metric"
1713
"github.com/cockroachdb/errors"
1814
"google.golang.org/grpc"
1915
"google.golang.org/grpc/codes"
@@ -30,12 +26,10 @@ type grpcServer struct {
3026
}
3127

3228
func newGRPCServer(
33-
ctx context.Context, rpcCtx *rpc.Context, metricsRegistry *metric.Registry,
29+
ctx context.Context, rpcCtx *rpc.Context, requestMetrics *rpc.RequestMetrics,
3430
) (*grpcServer, error) {
3531
s := &grpcServer{}
3632
s.mode.set(modeInitializing)
37-
requestMetrics := rpc.NewRequestMetrics()
38-
metricsRegistry.AddMetricStruct(requestMetrics)
3933
srv, interceptorInfo, err := rpc.NewServerEx(
4034
ctx, rpcCtx, rpc.WithInterceptor(func(path string) error {
4135
return s.intercept(path)
@@ -67,32 +61,3 @@ func (s *grpcServer) health(ctx context.Context) error {
6761
return srverrors.ServerError(ctx, errors.Newf("unknown mode: %v", sm))
6862
}
6963
}
70-
71-
// NewWaitingForInitError creates an error indicating that the server cannot run
72-
// the specified method until the node has been initialized.
73-
func NewWaitingForInitError(methodName string) error {
74-
// NB: this error string is sadly matched in grpcutil.IsWaitingForInit().
75-
return grpcstatus.Errorf(codes.Unavailable, "node waiting for init; %s not available", methodName)
76-
}
77-
78-
const (
79-
serverPrefix = "/cockroach.server"
80-
tsdbPrefix = "/cockroach.ts"
81-
)
82-
83-
// serverGRPCRequestMetricsEnabled is a cluster setting that enables the
84-
// collection of gRPC request duration metrics. This uses export only
85-
// metrics so the metrics are only exported to external sources such as
86-
// /_status/vars and DataDog.
87-
var serverGRPCRequestMetricsEnabled = settings.RegisterBoolSetting(
88-
settings.ApplicationLevel,
89-
"server.grpc.request_metrics.enabled",
90-
"enables the collection of grpc metrics",
91-
false,
92-
)
93-
94-
func shouldRecordRequestDuration(settings *cluster.Settings, method string) bool {
95-
return serverGRPCRequestMetricsEnabled.Get(&settings.SV) &&
96-
(strings.HasPrefix(method, serverPrefix) ||
97-
strings.HasPrefix(method, tsdbPrefix))
98-
}

pkg/server/grpc_server_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ func TestRequestMetricRegistered(t *testing.T) {
4343

4444
_, _ = ts.GetAdminClient(t).Settings(ctx, &serverpb.SettingsRequest{})
4545
require.Len(t, histogramVec.ToPrometheusMetrics(), 0, "Should not have recorded any metrics yet")
46-
serverGRPCRequestMetricsEnabled.Override(context.Background(), &ts.ClusterSettings().SV, true)
46+
serverRPCRequestMetricsEnabled.Override(context.Background(), &ts.ClusterSettings().SV, true)
4747
_, _ = ts.GetAdminClient(t).Settings(ctx, &serverpb.SettingsRequest{})
4848
require.Len(t, histogramVec.ToPrometheusMetrics(), 1, "Should have recorded metrics for request")
4949
}
@@ -67,7 +67,7 @@ func TestShouldRecordRequestDuration(t *testing.T) {
6767
settings := cluster.MakeTestingClusterSettings()
6868
for _, tt := range tests {
6969
t.Run(tt.methodName, func(t *testing.T) {
70-
serverGRPCRequestMetricsEnabled.Override(context.Background(), &settings.SV, tt.metricsEnabled)
70+
serverRPCRequestMetricsEnabled.Override(context.Background(), &settings.SV, tt.metricsEnabled)
7171
require.Equal(t, tt.expected, shouldRecordRequestDuration(settings, tt.methodName))
7272
})
7373
}

pkg/server/serve_mode.go

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,15 @@
55

66
package server
77

8-
import "sync/atomic"
8+
import (
9+
"strings"
10+
"sync/atomic"
11+
12+
"github.com/cockroachdb/cockroach/pkg/settings"
13+
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
14+
"google.golang.org/grpc/codes"
15+
grpcstatus "google.golang.org/grpc/status"
16+
)
917

1018
type serveModeHandler struct {
1119
mode serveMode
@@ -61,3 +69,33 @@ func (s *serveMode) set(mode serveMode) {
6169
func (s *serveMode) get() serveMode {
6270
return serveMode(atomic.LoadInt32((*int32)(s)))
6371
}
72+
73+
const (
74+
serverPrefix = "/cockroach.server"
75+
tsdbPrefix = "/cockroach.ts"
76+
)
77+
78+
// serverRPCRequestMetricsEnabled is a cluster setting that enables the
79+
// collection of gRPC and DRPC request duration metrics. This uses export only
80+
// metrics so the metrics are only exported to external sources such as
81+
// /_status/vars and DataDog.
82+
var serverRPCRequestMetricsEnabled = settings.RegisterBoolSetting(
83+
settings.ApplicationLevel,
84+
"server.rpc.request_metrics.enabled",
85+
"enables the collection of rpc metrics",
86+
false, /* defaultValue */
87+
settings.WithRetiredName("server.grpc.request_metrics.enabled"),
88+
)
89+
90+
func shouldRecordRequestDuration(settings *cluster.Settings, method string) bool {
91+
return serverRPCRequestMetricsEnabled.Get(&settings.SV) &&
92+
(strings.HasPrefix(method, serverPrefix) ||
93+
strings.HasPrefix(method, tsdbPrefix))
94+
}
95+
96+
// NewWaitingForInitError creates an error indicating that the server cannot run
97+
// the specified method until the node has been initialized.
98+
func NewWaitingForInitError(methodName string) error {
99+
// NB: this error string is sadly matched in grpcutil.IsWaitingForInit().
100+
return grpcstatus.Errorf(codes.Unavailable, "node waiting for init; %s not available", methodName)
101+
}

pkg/server/server.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -406,12 +406,15 @@ func NewServer(cfg Config, stopper *stop.Stopper) (serverctl.ServerStartupInterf
406406
// and after ValidateAddrs().
407407
rpcContext.CheckCertificateAddrs(ctx)
408408

409-
grpcServer, err := newGRPCServer(ctx, rpcContext, appRegistry)
409+
requestMetrics := rpc.NewRequestMetrics()
410+
appRegistry.AddMetricStruct(requestMetrics)
411+
412+
grpcServer, err := newGRPCServer(ctx, rpcContext, requestMetrics)
410413
if err != nil {
411414
return nil, err
412415
}
413416

414-
drpcServer, err := newDRPCServer(ctx, rpcContext)
417+
drpcServer, err := newDRPCServer(ctx, rpcContext, requestMetrics)
415418
if err != nil {
416419
return nil, err
417420
}

0 commit comments

Comments
 (0)