Skip to content

Commit ae0b3d7

Browse files
craig[bot]Nukitt
andcommitted
Merge #156034
156034: drpc, server: add request counter telemetry in DRPC-gateway r=cthumuluru-crdb a=Nukitt gRPC-Gateway provides basic request counting via https://github.com/cockroachdb/cockroach/blob/04f49359a573681a93d9724f37b6582b420eaf1b/pkg/server/grpc_gateway.go#L88 This patch adds support for request counting in drpc-gateway similar to how gRPC-Gateway provides and will help in production monitoring. Epic: CRDB-48924 Fixes: #151399 Release note: None Co-authored-by: Nukitt <nukit.tailor@cockroachlabs.com>
2 parents 7281dd9 + 4cbab26 commit ae0b3d7

File tree

5 files changed

+43
-6
lines changed

5 files changed

+43
-6
lines changed

pkg/rpc/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ go_library(
4141
"//pkg/security",
4242
"//pkg/security/certnames",
4343
"//pkg/security/username",
44+
"//pkg/server/telemetry",
4445
"//pkg/settings",
4546
"//pkg/settings/cluster",
4647
"//pkg/ts/tspb",

pkg/rpc/context.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -656,6 +656,12 @@ func NewContext(ctx context.Context, opts ContextOptions) *Context {
656656
rpcCtx.clientStreamInterceptorsDRPC = append(rpcCtx.clientStreamInterceptorsDRPC,
657657
drpcinterceptor.StreamClientInterceptor(tracer, tagger))
658658
}
659+
660+
// Add the DRPC gateway request counter interceptor to track telemetry for
661+
// HTTP gateway requests.
662+
rpcCtx.clientUnaryInterceptorsDRPC = append(rpcCtx.clientUnaryInterceptorsDRPC,
663+
drpcGatewayRequestCounterInterceptor)
664+
659665
// Note that we do not consult rpcCtx.Knobs.StreamClientInterceptor. That knob
660666
// can add another interceptor, but it can only do it dynamically, based on
661667
// a connection class. Only calls going over an actual gRPC connection will

pkg/rpc/drpc.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -220,7 +220,7 @@ func NewDRPCServer(_ context.Context, rpcCtx *Context, opts ...ServerOption) (DR
220220
streamInterceptors = append(streamInterceptors, stopStream)
221221

222222
// Recover from any uncaught panics caused by DB Console requests.
223-
unaryInterceptors = append(unaryInterceptors, DRPCGatewayRequestRecoveryInterceptor)
223+
unaryInterceptors = append(unaryInterceptors, drpcGatewayRequestRecoveryInterceptor)
224224

225225
// If the metrics interceptor is set, it should be registered second so
226226
// that all other interceptors are included in the response time durations.

pkg/rpc/drpc_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ func TestGatewayRequestDRPCRecoveryInterceptor(t *testing.T) {
9898
panic("test panic")
9999
}
100100

101-
resp, err := DRPCGatewayRequestRecoveryInterceptor(ctx, nil, "test", handler)
101+
resp, err := drpcGatewayRequestRecoveryInterceptor(ctx, nil, "test", handler)
102102

103103
require.Nil(t, resp)
104104
require.ErrorContains(t, err, "unexpected error occurred")
@@ -118,7 +118,7 @@ func TestGatewayRequestDRPCRecoveryInterceptor(t *testing.T) {
118118
}
119119
}()
120120

121-
_, _ = DRPCGatewayRequestRecoveryInterceptor(ctx, nil, "test", handler)
121+
_, _ = drpcGatewayRequestRecoveryInterceptor(ctx, nil, "test", handler)
122122
})
123123

124124
// With gateway metadata but no panic - should pass through normally
@@ -131,7 +131,7 @@ func TestGatewayRequestDRPCRecoveryInterceptor(t *testing.T) {
131131
return expectedResp, expectedErr
132132
}
133133

134-
resp, err := DRPCGatewayRequestRecoveryInterceptor(ctx, nil, "test", handler)
134+
resp, err := drpcGatewayRequestRecoveryInterceptor(ctx, nil, "test", handler)
135135

136136
require.Equal(t, expectedResp, resp)
137137
require.ErrorIs(t, err, expectedErr)

pkg/rpc/metrics.go

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,13 @@ package rpc
77

88
import (
99
"context"
10+
"fmt"
1011
"net/http"
1112
"strings"
1213

1314
"github.com/VividCortex/ewma"
1415
"github.com/cockroachdb/cockroach/pkg/roachpb"
16+
"github.com/cockroachdb/cockroach/pkg/server/telemetry"
1517
"github.com/cockroachdb/cockroach/pkg/util/log/logcrash"
1618
"github.com/cockroachdb/cockroach/pkg/util/metric"
1719
"github.com/cockroachdb/cockroach/pkg/util/metric/aggmetric"
@@ -23,6 +25,8 @@ import (
2325
"google.golang.org/grpc/codes"
2426
"google.golang.org/grpc/metadata"
2527
"google.golang.org/grpc/status"
28+
"storj.io/drpc"
29+
"storj.io/drpc/drpcclient"
2630
"storj.io/drpc/drpcmetadata"
2731
"storj.io/drpc/drpcmux"
2832
)
@@ -557,10 +561,10 @@ func MarkDRPCGatewayRequest(ctx context.Context) context.Context {
557561
return drpcmetadata.Add(ctx, gwRequestKey, "true")
558562
}
559563

560-
// DRPCGatewayRequestRecoveryInterceptor recovers from panics in DRPC handlers
564+
// drpcGatewayRequestRecoveryInterceptor recovers from panics in DRPC handlers
561565
// that are invoked due to DB console requests. For these requests, we do not
562566
// want an uncaught panic to crash the node.
563-
func DRPCGatewayRequestRecoveryInterceptor(
567+
func drpcGatewayRequestRecoveryInterceptor(
564568
ctx context.Context, req interface{}, rpc string, handler drpcmux.UnaryHandler,
565569
) (resp interface{}, err error) {
566570
if val, ok := drpcmetadata.GetValue(ctx, gwRequestKey); ok && val != "" {
@@ -574,3 +578,29 @@ func DRPCGatewayRequestRecoveryInterceptor(
574578
resp, err = handler(ctx, req)
575579
return resp, err
576580
}
581+
582+
// drpcGatewayRequestCounterInterceptor is a client-side interceptor that
583+
// increments telemetry counters for DRPC requests originating from the HTTP
584+
// gateway. It checks for the gateway request marker and increments
585+
// a counter named after the RPC method.
586+
func drpcGatewayRequestCounterInterceptor(
587+
ctx context.Context,
588+
rpc string,
589+
enc drpc.Encoding,
590+
in, out drpc.Message,
591+
cc *drpcclient.ClientConn,
592+
invoker drpcclient.UnaryInvoker,
593+
) error {
594+
// Check if this request originated from the DRPC HTTP gateway
595+
if val, ok := drpcmetadata.GetValue(ctx, gwRequestKey); ok && val != "" {
596+
telemetry.Inc(getDRPCGatewayEndpointCounter(rpc))
597+
}
598+
return invoker(ctx, rpc, enc, in, out, cc)
599+
}
600+
601+
// getDRPCGatewayEndpointCounter returns a telemetry Counter corresponding to
602+
// the given DRPC method.
603+
func getDRPCGatewayEndpointCounter(method string) telemetry.Counter {
604+
const counterPrefix = "http.drpc-gateway"
605+
return telemetry.GetCounter(fmt.Sprintf("%s.%s", counterPrefix, method))
606+
}

0 commit comments

Comments
 (0)