Skip to content

Commit f3113c9

Browse files
authored
Making the QueryRequest Releasable (#7094)
Signed-off-by: alanprot <alanprot@gmail.com>
1 parent c84eb12 commit f3113c9

File tree

4 files changed

+179
-97
lines changed

4 files changed

+179
-97
lines changed

integration/grpc_server_test.go

Lines changed: 29 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ type mockGprcServer struct {
3535
func (m mockGprcServer) QueryStream(req *ingester_client.QueryRequest, streamServer ingester_client.Ingester_QueryStreamServer) error {
3636
md, _ := metadata.FromIncomingContext(streamServer.Context())
3737
i, _ := strconv.Atoi(md["i"][0])
38+
defer req.Free()
3839
return streamServer.Send(createStreamResponse(i))
3940
}
4041

@@ -72,7 +73,7 @@ func (m mockGprcServer) Push(ctx context.Context, request *cortexpb.WriteRequest
7273
return &cortexpb.WriteResponse{}, nil
7374
}
7475

75-
func run(t *testing.T, cfg server.Config, register func(s *grpc.Server), validate func(t *testing.T, con *grpc.ClientConn)) {
76+
func run(t testing.TB, cfg server.Config, register func(s *grpc.Server), validate func(t testing.TB, con *grpc.ClientConn)) {
7677
savedRegistry := prometheus.DefaultRegisterer
7778
prometheus.DefaultRegisterer = prometheus.NewRegistry()
7879
defer func() {
@@ -107,6 +108,7 @@ func run(t *testing.T, cfg server.Config, register func(s *grpc.Server), validat
107108

108109
clientConfig := grpcclient.Config{}
109110
clientConfig.RegisterFlags(flag.NewFlagSet("fake", flag.ContinueOnError))
111+
clientConfig.GRPCCompression = "zstd"
110112

111113
dialOptions, err := clientConfig.DialOption(nil, nil)
112114
assert.NoError(t, err)
@@ -117,22 +119,44 @@ func run(t *testing.T, cfg server.Config, register func(s *grpc.Server), validat
117119
validate(t, conn)
118120
}
119121

122+
func BenchmarkGrpcCalls(b *testing.B) {
123+
cfg := server.Config{}
124+
(&cfg).RegisterFlags(flag.NewFlagSet("fake", flag.ContinueOnError))
125+
register := func(s *grpc.Server) {
126+
d := &mockGprcServer{}
127+
ingester_client.RegisterIngesterServer(s, d)
128+
}
129+
run(b, cfg, register, func(t testing.TB, conn *grpc.ClientConn) {
130+
ctx := context.Background()
131+
ctx = metadata.NewOutgoingContext(ctx, metadata.MD{"i": []string{strconv.Itoa(0)}})
132+
client := ingester_client.NewIngesterClient(conn)
133+
b.ReportAllocs()
134+
b.ResetTimer()
135+
for i := 0; i < b.N; i++ {
136+
s, err := client.QueryStream(ctx, &ingester_client.QueryRequest{})
137+
require.NoError(t, err)
138+
_, err = s.Recv()
139+
require.NoError(t, err)
140+
}
141+
})
142+
}
143+
120144
func TestConcurrentGrpcCalls(t *testing.T) {
121145
cfg := server.Config{}
122146
(&cfg).RegisterFlags(flag.NewFlagSet("fake", flag.ContinueOnError))
123147

124148
tc := map[string]struct {
125149
cfg server.Config
126150
register func(s *grpc.Server)
127-
validate func(t *testing.T, con *grpc.ClientConn)
151+
validate func(t testing.TB, con *grpc.ClientConn)
128152
}{
129153
"distributor": {
130154
cfg: cfg,
131155
register: func(s *grpc.Server) {
132156
d := &mockGprcServer{}
133157
distributorpb.RegisterDistributorServer(s, d)
134158
},
135-
validate: func(t *testing.T, conn *grpc.ClientConn) {
159+
validate: func(t testing.TB, conn *grpc.ClientConn) {
136160
client := distributorpb.NewDistributorClient(conn)
137161
wg := sync.WaitGroup{}
138162
n := 10000
@@ -156,7 +180,7 @@ func TestConcurrentGrpcCalls(t *testing.T) {
156180
d := &mockGprcServer{}
157181
ingester_client.RegisterIngesterServer(s, d)
158182
},
159-
validate: func(t *testing.T, conn *grpc.ClientConn) {
183+
validate: func(t testing.TB, conn *grpc.ClientConn) {
160184
ctx := context.Background()
161185
client := ingester_client.NewIngesterClient(conn)
162186
wg := sync.WaitGroup{}
@@ -188,7 +212,7 @@ func TestConcurrentGrpcCalls(t *testing.T) {
188212
d := &mockGprcServer{}
189213
ingester_client.RegisterIngesterServer(s, d)
190214
},
191-
validate: func(t *testing.T, conn *grpc.ClientConn) {
215+
validate: func(t testing.TB, conn *grpc.ClientConn) {
192216
client := ingester_client.NewIngesterClient(conn)
193217
wg := sync.WaitGroup{}
194218
n := 10000

0 commit comments

Comments
 (0)