diff --git a/api/metrics/client.go b/api/metrics/client.go index e00ae1be1aaa..cbaff955bcfe 100644 --- a/api/metrics/client.go +++ b/api/metrics/client.go @@ -12,6 +12,8 @@ import ( "github.com/prometheus/common/expfmt" + "github.com/ava-labs/avalanchego/utils/rpc" + dto "github.com/prometheus/client_model/go" ) @@ -45,6 +47,7 @@ func (c *Client) GetMetrics(ctx context.Context) (map[string]*dto.MetricFamily, return nil, fmt.Errorf("failed to create request: %w", err) } + //nolint:bodyclose // body is closed via rpc.CleanlyCloseBody in all code paths resp, err := http.DefaultClient.Do(request) if err != nil { return nil, fmt.Errorf("failed to issue request: %w", err) @@ -53,7 +56,7 @@ func (c *Client) GetMetrics(ctx context.Context) (map[string]*dto.MetricFamily, // Return an error for any non successful status code if resp.StatusCode < 200 || resp.StatusCode > 299 { // Drop any error during close to report the original error - _ = resp.Body.Close() + _ = rpc.CleanlyCloseBody(resp.Body) return nil, fmt.Errorf("received status code: %d", resp.StatusCode) } @@ -61,8 +64,9 @@ func (c *Client) GetMetrics(ctx context.Context) (map[string]*dto.MetricFamily, metrics, err := parser.TextToMetricFamilies(resp.Body) if err != nil { // Drop any error during close to report the original error - _ = resp.Body.Close() + _ = rpc.CleanlyCloseBody(resp.Body) return nil, err } - return metrics, resp.Body.Close() + + return metrics, rpc.CleanlyCloseBody(resp.Body) } diff --git a/tests/fixture/tmpnet/check_monitoring.go b/tests/fixture/tmpnet/check_monitoring.go index b306d20a1011..0d0d2d587658 100644 --- a/tests/fixture/tmpnet/check_monitoring.go +++ b/tests/fixture/tmpnet/check_monitoring.go @@ -24,6 +24,7 @@ import ( "github.com/ava-labs/avalanchego/tests/fixture/stacktrace" "github.com/ava-labs/avalanchego/utils/logging" + "github.com/ava-labs/avalanchego/utils/rpc" ) type getCountFunc func() (int, error) @@ -114,7 +115,7 @@ func queryLoki( if err != nil { return 0, stacktrace.Errorf("failed to execute request: %w", err) } - defer resp.Body.Close() + defer func() { _ = rpc.CleanlyCloseBody(resp.Body) }() // Read and parse response body, err := io.ReadAll(resp.Body) diff --git a/tests/fixture/tmpnet/monitor_processes.go b/tests/fixture/tmpnet/monitor_processes.go index b0d891a8b7e4..7a06e04e81a5 100644 --- a/tests/fixture/tmpnet/monitor_processes.go +++ b/tests/fixture/tmpnet/monitor_processes.go @@ -24,6 +24,7 @@ import ( "github.com/ava-labs/avalanchego/tests/fixture/stacktrace" "github.com/ava-labs/avalanchego/utils/logging" "github.com/ava-labs/avalanchego/utils/perms" + "github.com/ava-labs/avalanchego/utils/rpc" ) const ( @@ -591,7 +592,7 @@ func checkReadiness(ctx context.Context, url string) (bool, string, error) { if err != nil { return false, "", stacktrace.Errorf("request failed: %w", err) } - defer resp.Body.Close() + defer func() { _ = rpc.CleanlyCloseBody(resp.Body) }() body, err := io.ReadAll(resp.Body) if err != nil { diff --git a/tests/fixture/tmpnet/node.go b/tests/fixture/tmpnet/node.go index e572cd4bfdfa..554b9b771abc 100644 --- a/tests/fixture/tmpnet/node.go +++ b/tests/fixture/tmpnet/node.go @@ -24,6 +24,7 @@ import ( "github.com/ava-labs/avalanchego/tests/fixture/stacktrace" "github.com/ava-labs/avalanchego/utils/constants" "github.com/ava-labs/avalanchego/utils/crypto/bls/signer/localsigner" + "github.com/ava-labs/avalanchego/utils/rpc" "github.com/ava-labs/avalanchego/vms/platformvm/signer" ) @@ -224,7 +225,8 @@ func (n *Node) SaveMetricsSnapshot(ctx context.Context) error { if err != nil { return stacktrace.Wrap(err) } - defer resp.Body.Close() + defer func() { _ = rpc.CleanlyCloseBody(resp.Body) }() + body, err := io.ReadAll(resp.Body) if err != nil { return stacktrace.Wrap(err) diff --git a/utils/dynamicip/ifconfig_resolver.go b/utils/dynamicip/ifconfig_resolver.go index dccbbcbdc7a0..e8ee7e15104c 100644 --- a/utils/dynamicip/ifconfig_resolver.go +++ b/utils/dynamicip/ifconfig_resolver.go @@ -12,6 +12,7 @@ import ( "strings" "github.com/ava-labs/avalanchego/utils/ips" + "github.com/ava-labs/avalanchego/utils/rpc" ) var _ Resolver = (*ifConfigResolver)(nil) @@ -31,7 +32,7 @@ func (r *ifConfigResolver) Resolve(ctx context.Context) (netip.Addr, error) { if err != nil { return netip.Addr{}, err } - defer resp.Body.Close() + defer func() { _ = rpc.CleanlyCloseBody(resp.Body) }() ipBytes, err := io.ReadAll(resp.Body) if err != nil { diff --git a/utils/rpc/json.go b/utils/rpc/json.go index 62fc90169bbd..0a3e752f6ab7 100644 --- a/utils/rpc/json.go +++ b/utils/rpc/json.go @@ -6,14 +6,25 @@ package rpc import ( "bytes" "context" + "errors" "fmt" + "io" "net/http" "net/url" rpc "github.com/gorilla/rpc/v2/json2" ) +// CleanlyCloseBody avoids sending unnecessary RST_STREAM and PING frames by ensuring +// the whole body is read before being closed. +// See https://blog.cloudflare.com/go-and-enhance-your-calm/#reading-bodies-in-go-can-be-unintuitive +func CleanlyCloseBody(body io.ReadCloser) error { + _, err := io.Copy(io.Discard, body) + return errors.Join(err, body.Close()) +} + func SendJSONRequest( + client client, ctx context.Context, uri *url.URL, method string, @@ -42,7 +53,8 @@ func SendJSONRequest( request.Header = ops.headers request.Header.Set("Content-Type", "application/json") - resp, err := http.DefaultClient.Do(request) + //nolint:bodyclose // body is closed via CleanlyCloseBody in all code paths + resp, err := client.Send(request) if err != nil { return fmt.Errorf("failed to issue request: %w", err) } @@ -50,14 +62,15 @@ func SendJSONRequest( // Return an error for any non successful status code if resp.StatusCode < 200 || resp.StatusCode > 299 { // Drop any error during close to report the original error - _ = resp.Body.Close() + _ = CleanlyCloseBody(resp.Body) return fmt.Errorf("received status code: %d", resp.StatusCode) } if err := rpc.DecodeClientResponse(resp.Body, reply); err != nil { // Drop any error during close to report the original error - _ = resp.Body.Close() + _ = CleanlyCloseBody(resp.Body) return fmt.Errorf("failed to decode client response: %w", err) } - return resp.Body.Close() + + return CleanlyCloseBody(resp.Body) } diff --git a/utils/rpc/requester.go b/utils/rpc/requester.go index ed616b2fdfba..f0c08ce1ab50 100644 --- a/utils/rpc/requester.go +++ b/utils/rpc/requester.go @@ -5,22 +5,28 @@ package rpc import ( "context" + "net/http" "net/url" ) -var _ EndpointRequester = (*avalancheEndpointRequester)(nil) +var ( + _ EndpointRequester = (*avalancheEndpointRequester)(nil) + _ client = (*httpClient)(nil) +) type EndpointRequester interface { SendRequest(ctx context.Context, method string, params interface{}, reply interface{}, options ...Option) error } type avalancheEndpointRequester struct { - uri string + client client + uri string } func NewEndpointRequester(uri string) EndpointRequester { return &avalancheEndpointRequester{ - uri: uri, + client: &httpClient{c: http.DefaultClient}, + uri: uri, } } @@ -37,6 +43,7 @@ func (e *avalancheEndpointRequester) SendRequest( } return SendJSONRequest( + e.client, ctx, uri, method, @@ -45,3 +52,15 @@ func (e *avalancheEndpointRequester) SendRequest( options..., ) } + +type client interface { + Send(req *http.Request) (*http.Response, error) +} + +type httpClient struct { + c *http.Client +} + +func (h httpClient) Send(req *http.Request) (*http.Response, error) { + return h.c.Do(req) +} diff --git a/utils/rpc/requester_test.go b/utils/rpc/requester_test.go new file mode 100644 index 000000000000..9bf91c96c5b6 --- /dev/null +++ b/utils/rpc/requester_test.go @@ -0,0 +1,75 @@ +// Copyright (C) 2019-2025, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package rpc + +import ( + "context" + "net/http" + "net/http/httptest" + "testing" + + "github.com/stretchr/testify/require" + "golang.org/x/sync/errgroup" +) + +var _ client = (*testClient)(nil) + +// TestEndpointRequesterLongResponse tests that [EndpointRequester.SendRequest] +// respects context cancellation when draining a long response +func TestEndpointRequesterLongResponse(t *testing.T) { + server := httptest.NewServer( + http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + w.Header().Set("Content-Type", "application/json") + + _, _ = w.Write([]byte(`{"jsonrpc":"2.0","id":1,"result":"foobar"}`)) + w.(http.Flusher).Flush() + + // Try to keep sending data even after the client has received the + // response to try to block them while draining the http body. + for { + w.Write([]byte("foo")) + w.(http.Flusher).Flush() + } + }), + ) + + gotResponse := make(chan struct{}) + client := avalancheEndpointRequester{ + client: testClient{ + client: httpClient{ + c: http.DefaultClient, + }, + gotResponse: gotResponse, + }, + uri: server.URL, + } + + ctx, cancel := context.WithCancel(t.Context()) + defer cancel() + + eg := errgroup.Group{} + eg.Go(func() error { + return client.SendRequest(ctx, "foo", nil, new(any)) + }) + + // Block after we receive the response to check that context cancellation is + // respected while draining the response body. + <-gotResponse + cancel() + + err := eg.Wait() + require.ErrorIs(t, err, context.Canceled) +} + +type testClient struct { + client + gotResponse chan struct{} +} + +func (t testClient) Send(req *http.Request) (*http.Response, error) { + response, err := t.client.Send(req) + close(t.gotResponse) + + return response, err +}