From 41894c91db08e1845753efb55e7d8228237079c0 Mon Sep 17 00:00:00 2001 From: Greg May Date: Fri, 28 Nov 2025 10:55:24 -0800 Subject: [PATCH] increase connection logging and retry on failure --- Dockerfile | 2 +- shared/common.go | 147 +++++++++++++++++++++++++++++++++++++++-------- 2 files changed, 124 insertions(+), 25 deletions(-) diff --git a/Dockerfile b/Dockerfile index 776cbd8..cc7f508 100644 --- a/Dockerfile +++ b/Dockerfile @@ -7,7 +7,7 @@ RUN apk add --no-cache git build-base # Set working directory WORKDIR /app -ENV GOPRIVATE=gitlab.com/blockdaemon,go.blockdaemon.com/blockdaemon,gitlab.com/Blockdaemon,go.blockdaemon.com +ENV GOPRIVATE=gitlab.com/blockdaemon,go.blockdaemon.com/blockdaemon,gitlab.com/Blockdaemon,go.Blockdaemon.com # Copy go mod files COPY go.mod go.sum ./ diff --git a/shared/common.go b/shared/common.go index bf2d07b..6e371fe 100644 --- a/shared/common.go +++ b/shared/common.go @@ -8,9 +8,11 @@ import ( "log/slog" "math/big" "net/http" + "net/url" "os" "strings" "sync" + "time" mapset "github.com/deckarep/golang-set/v2" "github.com/ethereum/go-ethereum/common" @@ -172,44 +174,90 @@ func TimeLogger(whichController string) *log.Logger { } func ConnectToRPC(rpcURL string) (*ethclient.Client, *big.Int) { - Infof(slog.Default(), "Connecting to RPC URL...\n") - // Build dial options and include optional HTTP/WebSocket headers from env - var clientOptions []rpc.ClientOption - clientOptions = append(clientOptions, rpc.WithWebsocketMessageSizeLimit(0)) - - // Support explicit key/value envs first, fallback to legacy RPC_HEADER="Key:Value" - { - var hdr http.Header - if key, val := os.Getenv("RPC_HEADER_KEY"), os.Getenv("RPC_HEADER_VALUE"); key != "" && val != "" { - hdr = http.Header{} - hdr.Add(strings.TrimSpace(key), strings.TrimSpace(val)) - } - if hdr != nil { - clientOptions = append(clientOptions, rpc.WithHeaders(hdr)) - for k := range hdr { - Infof(slog.Default(), "Applied RPC header '%s' from environment\n", k) - } + sanitizedRPCURL := sanitizeRPCURL(rpcURL) + for { + client, chainID, err := dialRPC(rpcURL, sanitizedRPCURL) + if err == nil { + return client, chainID } + + Warnf( + slog.Default(), + "RPC connection failed (%v). Retrying in %s...", + err, + rpcReconnectInterval(), + ) + time.Sleep(rpcReconnectInterval()) } +} + +func dialRPC(rpcURL, sanitizedRPCURL string) (*ethclient.Client, *big.Int, error) { + Infof(slog.Default(), "Connecting to RPC URL %s\n", sanitizedRPCURL) + logProxyConfiguration() + + clientOptions := buildClientOptions() - RpcClient, err := rpc.DialOptions( - context.Background(), + ctx, cancel := context.WithTimeout(context.Background(), rpcDialTimeout()) + defer cancel() + + rpcClient, err := rpc.DialOptions( + ctx, rpcURL, clientOptions..., ) - client := ethclient.NewClient(RpcClient) - if err != nil { - log.Fatal(err) + return nil, nil, fmt.Errorf("dial %s: %w", sanitizedRPCURL, err) } + + client := ethclient.NewClient(rpcClient) + Infof(slog.Default(), "Connected\n") Infof(slog.Default(), "Fetching Chain ID...\n") chainID, err := client.NetworkID(context.Background()) if err != nil { - log.Fatal(err) + return nil, nil, fmt.Errorf("fetch chain ID: %w", err) } Infof(slog.Default(), "Chain ID: %d\n", chainID) - return client, chainID + return client, chainID, nil +} + +func buildClientOptions() []rpc.ClientOption { + var clientOptions []rpc.ClientOption + clientOptions = append(clientOptions, rpc.WithWebsocketMessageSizeLimit(0)) + + if hdr := buildRPCHeaders(); hdr != nil { + clientOptions = append(clientOptions, rpc.WithHeaders(hdr)) + } + + return clientOptions +} + +func buildRPCHeaders() http.Header { + var hdr http.Header + if key, val := os.Getenv("RPC_HEADER_KEY"), os.Getenv("RPC_HEADER_VALUE"); key != "" && val != "" { + hdr = http.Header{} + hdr.Add(strings.TrimSpace(key), strings.TrimSpace(val)) + } + + if hdr == nil { + if legacy := strings.TrimSpace(os.Getenv("RPC_HEADER")); legacy != "" { + parts := strings.SplitN(legacy, ":", 2) + if len(parts) == 2 { + if hdr == nil { + hdr = http.Header{} + } + hdr.Add(strings.TrimSpace(parts[0]), strings.TrimSpace(parts[1])) + } else { + Warnf(slog.Default(), "RPC_HEADER must be in 'Key: Value' format. Ignoring malformed value.") + } + } + } + + for k := range hdr { + Infof(slog.Default(), "Applied RPC header '%s' from environment\n", k) + } + + return hdr } func LoadFromJson(filename string, v any) { @@ -220,3 +268,54 @@ func LoadFromJson(filename string, v any) { log.Fatalf("LoadFromJson error: %v\n", err) } } + +func sanitizeRPCURL(raw string) string { + if raw == "" { + return "" + } + parsed, err := url.Parse(raw) + if err != nil { + return "" + } + parsed.User = nil + return parsed.String() +} + +func logProxyConfiguration() { + proxyVars := []string{ + "HTTPS_PROXY", + "HTTP_PROXY", + "ALL_PROXY", + "NO_PROXY", + "RPC_HEADER_KEY", + "RPC_HEADER_VALUE", + "RPC_HEADER", + } + for _, key := range proxyVars { + if val := os.Getenv(key); val != "" { + Infof(slog.Default(), "Environment variable %s is set\n", key) + } + } +} + +func rpcDialTimeout() time.Duration { + const defaultTimeout = 30 * time.Second + if val := os.Getenv("RPC_DIAL_TIMEOUT"); val != "" { + if duration, err := time.ParseDuration(val); err == nil { + return duration + } + Warnf(slog.Default(), "Invalid RPC_DIAL_TIMEOUT value %q, defaulting to %s", val, defaultTimeout) + } + return defaultTimeout +} + +func rpcReconnectInterval() time.Duration { + const defaultInterval = 10 * time.Second + if val := os.Getenv("RPC_RECONNECT_INTERVAL"); val != "" { + if duration, err := time.ParseDuration(val); err == nil { + return duration + } + Warnf(slog.Default(), "Invalid RPC_RECONNECT_INTERVAL value %q, defaulting to %s", val, defaultInterval) + } + return defaultInterval +}