Skip to content

Commit 4cbab26

Browse files
committed
drpc, server: add request counter telemetry in DRPC-gateway
grpc-Gateway provides basic request counting via telemetry.Inc(getServerEndpointCounter(method)). This patch adds support for request counting in DRPC-gateway similar to how gRPC-Gateway provides and will help in production monitoring. Epic: CRDB-48924 Fixes: #151399 Release note: None
1 parent 48e15d6 commit 4cbab26

File tree

5 files changed

+43
-6
lines changed

5 files changed

+43
-6
lines changed

pkg/rpc/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ go_library(
4141
"//pkg/security",
4242
"//pkg/security/certnames",
4343
"//pkg/security/username",
44+
"//pkg/server/telemetry",
4445
"//pkg/settings",
4546
"//pkg/settings/cluster",
4647
"//pkg/ts/tspb",

pkg/rpc/context.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -656,6 +656,12 @@ func NewContext(ctx context.Context, opts ContextOptions) *Context {
656656
rpcCtx.clientStreamInterceptorsDRPC = append(rpcCtx.clientStreamInterceptorsDRPC,
657657
drpcinterceptor.StreamClientInterceptor(tracer, tagger))
658658
}
659+
660+
// Add the DRPC gateway request counter interceptor to track telemetry for
661+
// HTTP gateway requests.
662+
rpcCtx.clientUnaryInterceptorsDRPC = append(rpcCtx.clientUnaryInterceptorsDRPC,
663+
drpcGatewayRequestCounterInterceptor)
664+
659665
// Note that we do not consult rpcCtx.Knobs.StreamClientInterceptor. That knob
660666
// can add another interceptor, but it can only do it dynamically, based on
661667
// a connection class. Only calls going over an actual gRPC connection will

pkg/rpc/drpc.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -220,7 +220,7 @@ func NewDRPCServer(_ context.Context, rpcCtx *Context, opts ...ServerOption) (DR
220220
streamInterceptors = append(streamInterceptors, stopStream)
221221

222222
// Recover from any uncaught panics caused by DB Console requests.
223-
unaryInterceptors = append(unaryInterceptors, DRPCGatewayRequestRecoveryInterceptor)
223+
unaryInterceptors = append(unaryInterceptors, drpcGatewayRequestRecoveryInterceptor)
224224

225225
if !rpcCtx.ContextOptions.Insecure {
226226
a := kvAuth{

pkg/rpc/drpc_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ func TestGatewayRequestDRPCRecoveryInterceptor(t *testing.T) {
9898
panic("test panic")
9999
}
100100

101-
resp, err := DRPCGatewayRequestRecoveryInterceptor(ctx, nil, "test", handler)
101+
resp, err := drpcGatewayRequestRecoveryInterceptor(ctx, nil, "test", handler)
102102

103103
require.Nil(t, resp)
104104
require.ErrorContains(t, err, "unexpected error occurred")
@@ -118,7 +118,7 @@ func TestGatewayRequestDRPCRecoveryInterceptor(t *testing.T) {
118118
}
119119
}()
120120

121-
_, _ = DRPCGatewayRequestRecoveryInterceptor(ctx, nil, "test", handler)
121+
_, _ = drpcGatewayRequestRecoveryInterceptor(ctx, nil, "test", handler)
122122
})
123123

124124
// With gateway metadata but no panic - should pass through normally
@@ -131,7 +131,7 @@ func TestGatewayRequestDRPCRecoveryInterceptor(t *testing.T) {
131131
return expectedResp, expectedErr
132132
}
133133

134-
resp, err := DRPCGatewayRequestRecoveryInterceptor(ctx, nil, "test", handler)
134+
resp, err := drpcGatewayRequestRecoveryInterceptor(ctx, nil, "test", handler)
135135

136136
require.Equal(t, expectedResp, resp)
137137
require.ErrorIs(t, err, expectedErr)

pkg/rpc/metrics.go

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,13 @@ package rpc
77

88
import (
99
"context"
10+
"fmt"
1011
"net/http"
1112
"strings"
1213

1314
"github.com/VividCortex/ewma"
1415
"github.com/cockroachdb/cockroach/pkg/roachpb"
16+
"github.com/cockroachdb/cockroach/pkg/server/telemetry"
1517
"github.com/cockroachdb/cockroach/pkg/util/log/logcrash"
1618
"github.com/cockroachdb/cockroach/pkg/util/metric"
1719
"github.com/cockroachdb/cockroach/pkg/util/metric/aggmetric"
@@ -23,6 +25,8 @@ import (
2325
"google.golang.org/grpc/codes"
2426
"google.golang.org/grpc/metadata"
2527
"google.golang.org/grpc/status"
28+
"storj.io/drpc"
29+
"storj.io/drpc/drpcclient"
2630
"storj.io/drpc/drpcmetadata"
2731
"storj.io/drpc/drpcmux"
2832
)
@@ -521,10 +525,10 @@ func MarkDRPCGatewayRequest(ctx context.Context) context.Context {
521525
return drpcmetadata.Add(ctx, gwRequestKey, "true")
522526
}
523527

524-
// DRPCGatewayRequestRecoveryInterceptor recovers from panics in DRPC handlers
528+
// drpcGatewayRequestRecoveryInterceptor recovers from panics in DRPC handlers
525529
// that are invoked due to DB console requests. For these requests, we do not
526530
// want an uncaught panic to crash the node.
527-
func DRPCGatewayRequestRecoveryInterceptor(
531+
func drpcGatewayRequestRecoveryInterceptor(
528532
ctx context.Context, req interface{}, rpc string, handler drpcmux.UnaryHandler,
529533
) (resp interface{}, err error) {
530534
if val, ok := drpcmetadata.GetValue(ctx, gwRequestKey); ok && val != "" {
@@ -538,3 +542,29 @@ func DRPCGatewayRequestRecoveryInterceptor(
538542
resp, err = handler(ctx, req)
539543
return resp, err
540544
}
545+
546+
// drpcGatewayRequestCounterInterceptor is a client-side interceptor that
547+
// increments telemetry counters for DRPC requests originating from the HTTP
548+
// gateway. It checks for the gateway request marker and increments
549+
// a counter named after the RPC method.
550+
func drpcGatewayRequestCounterInterceptor(
551+
ctx context.Context,
552+
rpc string,
553+
enc drpc.Encoding,
554+
in, out drpc.Message,
555+
cc *drpcclient.ClientConn,
556+
invoker drpcclient.UnaryInvoker,
557+
) error {
558+
// Check if this request originated from the DRPC HTTP gateway
559+
if val, ok := drpcmetadata.GetValue(ctx, gwRequestKey); ok && val != "" {
560+
telemetry.Inc(getDRPCGatewayEndpointCounter(rpc))
561+
}
562+
return invoker(ctx, rpc, enc, in, out, cc)
563+
}
564+
565+
// getDRPCGatewayEndpointCounter returns a telemetry Counter corresponding to
566+
// the given DRPC method.
567+
func getDRPCGatewayEndpointCounter(method string) telemetry.Counter {
568+
const counterPrefix = "http.drpc-gateway"
569+
return telemetry.GetCounter(fmt.Sprintf("%s.%s", counterPrefix, method))
570+
}

0 commit comments

Comments
 (0)