Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ RUN apk add --no-cache git build-base
# Set working directory
WORKDIR /app

ENV GOPRIVATE=gitlab.com/blockdaemon,go.blockdaemon.com/blockdaemon,gitlab.com/Blockdaemon,go.blockdaemon.com
ENV GOPRIVATE=gitlab.com/blockdaemon,go.blockdaemon.com/blockdaemon,gitlab.com/Blockdaemon,go.Blockdaemon.com

# Copy go mod files
COPY go.mod go.sum ./
Expand Down
147 changes: 123 additions & 24 deletions shared/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,11 @@ import (
"log/slog"
"math/big"
"net/http"
"net/url"
"os"
"strings"
"sync"
"time"

mapset "github.com/deckarep/golang-set/v2"
"github.com/ethereum/go-ethereum/common"
Expand Down Expand Up @@ -172,44 +174,90 @@ func TimeLogger(whichController string) *log.Logger {
}

func ConnectToRPC(rpcURL string) (*ethclient.Client, *big.Int) {
Infof(slog.Default(), "Connecting to RPC URL...\n")
// Build dial options and include optional HTTP/WebSocket headers from env
var clientOptions []rpc.ClientOption
clientOptions = append(clientOptions, rpc.WithWebsocketMessageSizeLimit(0))

// Support explicit key/value envs first, fallback to legacy RPC_HEADER="Key:Value"
{
var hdr http.Header
if key, val := os.Getenv("RPC_HEADER_KEY"), os.Getenv("RPC_HEADER_VALUE"); key != "" && val != "" {
hdr = http.Header{}
hdr.Add(strings.TrimSpace(key), strings.TrimSpace(val))
}
if hdr != nil {
clientOptions = append(clientOptions, rpc.WithHeaders(hdr))
for k := range hdr {
Infof(slog.Default(), "Applied RPC header '%s' from environment\n", k)
}
sanitizedRPCURL := sanitizeRPCURL(rpcURL)
for {
client, chainID, err := dialRPC(rpcURL, sanitizedRPCURL)
if err == nil {
return client, chainID
}

Warnf(
slog.Default(),
"RPC connection failed (%v). Retrying in %s...",
err,
rpcReconnectInterval(),
)
time.Sleep(rpcReconnectInterval())
}
}

func dialRPC(rpcURL, sanitizedRPCURL string) (*ethclient.Client, *big.Int, error) {
Infof(slog.Default(), "Connecting to RPC URL %s\n", sanitizedRPCURL)
logProxyConfiguration()

clientOptions := buildClientOptions()

RpcClient, err := rpc.DialOptions(
context.Background(),
ctx, cancel := context.WithTimeout(context.Background(), rpcDialTimeout())
defer cancel()

rpcClient, err := rpc.DialOptions(
ctx,
rpcURL,
clientOptions...,
)
client := ethclient.NewClient(RpcClient)

if err != nil {
log.Fatal(err)
return nil, nil, fmt.Errorf("dial %s: %w", sanitizedRPCURL, err)
}

client := ethclient.NewClient(rpcClient)

Infof(slog.Default(), "Connected\n")
Infof(slog.Default(), "Fetching Chain ID...\n")
chainID, err := client.NetworkID(context.Background())
if err != nil {
log.Fatal(err)
return nil, nil, fmt.Errorf("fetch chain ID: %w", err)
}
Infof(slog.Default(), "Chain ID: %d\n", chainID)
return client, chainID
return client, chainID, nil
}

func buildClientOptions() []rpc.ClientOption {
var clientOptions []rpc.ClientOption
clientOptions = append(clientOptions, rpc.WithWebsocketMessageSizeLimit(0))

if hdr := buildRPCHeaders(); hdr != nil {
clientOptions = append(clientOptions, rpc.WithHeaders(hdr))
}

return clientOptions
}

func buildRPCHeaders() http.Header {
var hdr http.Header
if key, val := os.Getenv("RPC_HEADER_KEY"), os.Getenv("RPC_HEADER_VALUE"); key != "" && val != "" {
hdr = http.Header{}
hdr.Add(strings.TrimSpace(key), strings.TrimSpace(val))
}

if hdr == nil {
if legacy := strings.TrimSpace(os.Getenv("RPC_HEADER")); legacy != "" {
parts := strings.SplitN(legacy, ":", 2)
if len(parts) == 2 {
if hdr == nil {
hdr = http.Header{}
}
hdr.Add(strings.TrimSpace(parts[0]), strings.TrimSpace(parts[1]))
} else {
Warnf(slog.Default(), "RPC_HEADER must be in 'Key: Value' format. Ignoring malformed value.")
}
}
}

for k := range hdr {
Infof(slog.Default(), "Applied RPC header '%s' from environment\n", k)
}

return hdr
}

func LoadFromJson(filename string, v any) {
Expand All @@ -220,3 +268,54 @@ func LoadFromJson(filename string, v any) {
log.Fatalf("LoadFromJson error: %v\n", err)
}
}

func sanitizeRPCURL(raw string) string {
if raw == "" {
return "<empty>"
}
parsed, err := url.Parse(raw)
if err != nil {
return "<invalid>"
}
parsed.User = nil
return parsed.String()
}

func logProxyConfiguration() {
proxyVars := []string{
"HTTPS_PROXY",
"HTTP_PROXY",
"ALL_PROXY",
"NO_PROXY",
"RPC_HEADER_KEY",
"RPC_HEADER_VALUE",
"RPC_HEADER",
}
for _, key := range proxyVars {
if val := os.Getenv(key); val != "" {
Infof(slog.Default(), "Environment variable %s is set\n", key)
}
}
}

func rpcDialTimeout() time.Duration {
const defaultTimeout = 30 * time.Second
if val := os.Getenv("RPC_DIAL_TIMEOUT"); val != "" {
if duration, err := time.ParseDuration(val); err == nil {
return duration
}
Warnf(slog.Default(), "Invalid RPC_DIAL_TIMEOUT value %q, defaulting to %s", val, defaultTimeout)
}
return defaultTimeout
}

func rpcReconnectInterval() time.Duration {
const defaultInterval = 10 * time.Second
if val := os.Getenv("RPC_RECONNECT_INTERVAL"); val != "" {
if duration, err := time.ParseDuration(val); err == nil {
return duration
}
Warnf(slog.Default(), "Invalid RPC_RECONNECT_INTERVAL value %q, defaulting to %s", val, defaultInterval)
}
return defaultInterval
}