diff --git a/.github/workflows/go.yml b/.github/workflows/go.yml index 85cf4ec..fd15e8c 100644 --- a/.github/workflows/go.yml +++ b/.github/workflows/go.yml @@ -14,14 +14,17 @@ jobs: - uses: actions/checkout@v2 - name: Set up Go - uses: actions/setup-go@v2 + uses: actions/setup-go@v6 with: - go-version: 1.15 + go-version: 1.25 - - name: Get tooling + - name: Install tooling run: | - go get golang.org/x/lint/golint - go get honnef.co/go/tools/cmd/staticcheck + # Install analysis tools. staticcheck @latest for Go 1.24 compatibility. + go install honnef.co/go/tools/cmd/staticcheck@latest + # golint is deprecated; keep temporarily (will remove in follow-up) + go install golang.org/x/lint/golint@latest || echo "golint install failed (deprecated)" + echo "$(go env GOPATH)/bin" >> "$GITHUB_PATH" - name: Build run: go build -v -tags=gofuzz ./... diff --git a/cmd/openvswitch_exporter/main.go b/cmd/openvswitch_exporter/main.go index 59c93d2..a2ab22a 100644 --- a/cmd/openvswitch_exporter/main.go +++ b/cmd/openvswitch_exporter/main.go @@ -5,11 +5,17 @@ package main import ( + "context" "flag" "log" "net/http" + "os" + "os/signal" + "syscall" + "time" "github.com/digitalocean/go-openvswitch/ovsnl" + "github.com/digitalocean/openvswitch_exporter/internal/conntrack" "github.com/digitalocean/openvswitch_exporter/internal/ovsexporter" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" @@ -17,8 +23,9 @@ import ( func main() { var ( - metricsAddr = flag.String("metrics.addr", ":9310", "address for Open vSwitch exporter") - metricsPath = flag.String("metrics.path", "/metrics", "URL path for surfacing collected metrics") + metricsAddr = flag.String("metrics.addr", ":9310", "address for Open vSwitch exporter") + metricsPath = flag.String("metrics.path", "/metrics", "URL path for surfacing collected metrics") + enableConntrack = flag.Bool("enable.conntrack", true, "enable conntrack metrics exporter") ) flag.Parse() @@ -32,15 +39,59 @@ func main() { collector := ovsexporter.New(c) prometheus.MustRegister(collector) + // Optionally register conntrack collector + if *enableConntrack { + conntrackCollector, conntrackAggregator, err := conntrack.NewCollector() + if err != nil { + log.Printf("Warning: Failed to create conntrack collector: %v", err) + } else { + prometheus.MustRegister(conntrackCollector) + defer func() { + if conntrackAggregator != nil { + if err := conntrackAggregator.Stop(); err != nil { + log.Printf("Conntrack aggregator shutdown error: %v", err) + } + } + }() + log.Printf("Conntrack metrics exporter enabled") + } + } + mux := http.NewServeMux() mux.Handle(*metricsPath, promhttp.Handler()) mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { http.Redirect(w, r, *metricsPath, http.StatusMovedPermanently) }) - log.Printf("starting Open vSwitch exporter on %q", *metricsAddr) + // Create HTTP server + server := &http.Server{ + Addr: *metricsAddr, + Handler: mux, + } + + // Handle shutdown signals + sigChan := make(chan os.Signal, 1) + signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) - if err := http.ListenAndServe(*metricsAddr, mux); err != nil { - log.Fatalf("cannot start Open vSwitch exporter: %v", err) + // Start server in goroutine + go func() { + log.Printf("starting Open vSwitch exporter on %q", *metricsAddr) + if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed { + log.Fatalf("cannot start Open vSwitch exporter: %v", err) + } + }() + + // Wait for shutdown signal + sig := <-sigChan + log.Printf("Received signal %v, stopping gracefully...", sig) + + // Graceful shutdown with 15 second timeout + ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) + defer cancel() + + if err := server.Shutdown(ctx); err != nil { + log.Printf("Server shutdown error: %v", err) } + + log.Printf("Exporter stopped") } diff --git a/go.mod b/go.mod index 20f9632..5caef53 100644 --- a/go.mod +++ b/go.mod @@ -1,18 +1,30 @@ module github.com/digitalocean/openvswitch_exporter -go 1.15 +go 1.25 require ( github.com/digitalocean/go-openvswitch v0.0.0-20201214180534-ce0f183468d8 - github.com/google/go-cmp v0.5.4 // indirect - github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect - github.com/mdlayher/netlink v1.3.2 // indirect github.com/prometheus/client_golang v1.9.0 - github.com/prometheus/common v0.17.0 // indirect - github.com/prometheus/procfs v0.6.0 // indirect github.com/prometheus/prometheus v2.2.1-0.20180315085919-58e2a31db8de+incompatible - golang.org/x/net v0.0.0-20210222171744-9060382bd457 // indirect - golang.org/x/sync v0.0.0-20210220032951-036812b2e83c // indirect - golang.org/x/sys v0.0.0-20210223095934-7937bea0104d // indirect - google.golang.org/protobuf v1.25.0 // indirect + github.com/ti-mo/conntrack v0.6.0 + github.com/ti-mo/netfilter v0.5.3 + golang.org/x/sync v0.17.0 +) + +require ( + github.com/beorn7/perks v1.0.1 // indirect + github.com/cespare/xxhash/v2 v2.1.1 // indirect + github.com/golang/protobuf v1.4.3 // indirect + github.com/google/go-cmp v0.7.0 // indirect + github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect + github.com/mdlayher/genetlink v1.3.2 // indirect + github.com/mdlayher/netlink v1.8.0 // indirect + github.com/mdlayher/socket v0.5.1 // indirect + github.com/pkg/errors v0.9.1 // indirect + github.com/prometheus/client_model v0.2.0 // indirect + github.com/prometheus/common v0.15.0 // indirect + github.com/prometheus/procfs v0.2.0 // indirect + golang.org/x/net v0.46.0 // indirect + golang.org/x/sys v0.37.0 // indirect + google.golang.org/protobuf v1.23.0 // indirect ) diff --git a/go.sum b/go.sum index 1d1c526..5ec1a5c 100644 --- a/go.sum +++ b/go.sum @@ -20,8 +20,6 @@ github.com/aryann/difflib v0.0.0-20170710044230-e206f873d14a/go.mod h1:DAHtR1m6l github.com/aws/aws-lambda-go v1.13.3/go.mod h1:4UKl9IzQMoD+QF79YdCuzCwp8VbmG4VAQwij/eHl5CU= github.com/aws/aws-sdk-go v1.27.0/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo= github.com/aws/aws-sdk-go-v2 v0.18.0/go.mod h1:JWVYvqSMppoMJC0x5wdwiImzgXTI9FuZwxzkQq9wy+g= -github.com/beorn7/perks v0.0.0-20160804104726-4c0e84591b9a h1:BtpsbiV638WQZwhA98cEZw2BsbnQJrbd0BI7tsy0W1c= -github.com/beorn7/perks v0.0.0-20160804104726-4c0e84591b9a/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= @@ -42,10 +40,9 @@ github.com/coreos/pkg v0.0.0-20160727233714-3ac0863d7acf/go.mod h1:E3G3o1h8I7cfc github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU= github.com/creack/pty v1.1.7/go.mod h1:lj5s0c3V2DBrqTV7llrYr5NG6My20zk30Fl46Y7DoTY= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ= -github.com/digitalocean/go-openvswitch v0.0.0-20180412190941-6a4a47d93e43 h1:WbVAw/VDkXvaFyMOkJRzKBE6bf9PY7PAfrsOY3RHnIE= -github.com/digitalocean/go-openvswitch v0.0.0-20180412190941-6a4a47d93e43/go.mod h1:MpzfscrezUxa94/T4sy2tDaxB+hQ6w0EmRBPv+xHWEs= github.com/digitalocean/go-openvswitch v0.0.0-20201214180534-ce0f183468d8 h1:RQAD2flP6n+U5sAudMpru+EuLJ6VQduu6yenl6LwM5E= github.com/digitalocean/go-openvswitch v0.0.0-20201214180534-ce0f183468d8/go.mod h1:MpzfscrezUxa94/T4sy2tDaxB+hQ6w0EmRBPv+xHWEs= github.com/dustin/go-humanize v0.0.0-20171111073723-bb3d318650d4/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= @@ -77,8 +74,6 @@ github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfU github.com/golang/groupcache v0.0.0-20160516000752-02826c3e7903/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= -github.com/golang/protobuf v0.0.0-20171021043952-1643683e1b54 h1:nRNJXiJvemchkOTn0V4U11TZkvacB94gTzbTZbSA7Rw= -github.com/golang/protobuf v0.0.0-20171021043952-1643683e1b54/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= @@ -87,7 +82,6 @@ github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:x github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs= github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:WU3c8KckQ9AFe+yFwt9sWVRKCVIyN9cPHBJSNnbL67w= github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0= -github.com/golang/protobuf v1.4.1/go.mod h1:U8fpvMrcmy5pZrNK1lt4xCsGvpyWQ/VVv6QDs8UjoX8= github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= github.com/golang/protobuf v1.4.3 h1:JjCZWpVbqXDqFVmTfYWEVTMIYrL/NPdPSCHPJ0T/raM= github.com/golang/protobuf v1.4.3/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= @@ -98,10 +92,8 @@ github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5a github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= -github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= -github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= -github.com/google/go-cmp v0.5.4 h1:L8R9j+yAqZuZjsqh/z+F1NCffTKKLShY6zXTItVIZ8M= -github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= +github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI= github.com/google/uuid v1.0.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= @@ -139,16 +131,7 @@ github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANyt github.com/influxdata/influxdb1-client v0.0.0-20191209144304-8bf82d3c094d/go.mod h1:qj24IKcXYK6Iy9ceXlo3Tc+vtHo9lIhSX5JddghvEPo= github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k= github.com/jonboulle/clockwork v0.1.0/go.mod h1:Ii8DK3G1RaLaWxj9trq07+26W01tbo22gdxWY5EU2bo= -github.com/josharian/native v0.0.0-20200817173448-b6b71def0850 h1:uhL5Gw7BINiiPAo24A2sxkcDI0Jt/sqp1v5xQCniEFA= -github.com/josharian/native v0.0.0-20200817173448-b6b71def0850/go.mod h1:7X/raswPFr05uY3HiLlYeyQntB6OO7E/d2Cu7qoaN2w= github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4= -github.com/jsimonetti/rtnetlink v0.0.0-20190606172950-9527aa82566a/go.mod h1:Oz+70psSo5OFh8DBl0Zv2ACw7Esh6pPUphlvZG9x7uw= -github.com/jsimonetti/rtnetlink v0.0.0-20200117123717-f846d4f6c1f4/go.mod h1:WGuG/smIU4J/54PblvSbh+xvCZmpJnFgr3ds6Z55XMQ= -github.com/jsimonetti/rtnetlink v0.0.0-20201009170750-9c6f07d100c1/go.mod h1:hqoO/u39cqLeBLebZ8fWdE96O7FxrAsRYhnVOdgHxok= -github.com/jsimonetti/rtnetlink v0.0.0-20201216134343-bde56ed16391/go.mod h1:cR77jAZG3Y3bsb8hF6fHJbFoyFukLFOkQ98S0pQz3xw= -github.com/jsimonetti/rtnetlink v0.0.0-20201220180245-69540ac93943/go.mod h1:z4c53zj6Eex712ROyh8WI0ihysb5j2ROyV42iNogmAs= -github.com/jsimonetti/rtnetlink v0.0.0-20210122163228-8d122574c736/go.mod h1:ZXpIyOK59ZnN7J0BV99cZUPmsqDRZ3eq5X+st7u/oSA= -github.com/jsimonetti/rtnetlink v0.0.0-20210212075122-66c871082f2b/go.mod h1:8w9Rh8m+aHZIG69YPGGem1i5VzoyRC8nw2kA8B+ik5U= github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU= github.com/json-iterator/go v1.1.7/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= github.com/json-iterator/go v1.1.8/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= @@ -173,23 +156,12 @@ github.com/mattn/go-isatty v0.0.4/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNx github.com/mattn/go-runewidth v0.0.2/go.mod h1:LwmH8dsx7+W8Uxz3IHJYH5QSwggIsqBzpuz5H//U1FU= github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU= github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= -github.com/mdlayher/ethtool v0.0.0-20210210192532-2b88debcdd43/go.mod h1:+t7E0lkKfbBsebllff1xdTmyJt8lH37niI6kwFk9OTo= -github.com/mdlayher/genetlink v0.0.0-20170901181924-76fecce4c787 h1:Tbivh+kRjFJUTZmMic7LcmuzfEF/HV42ZRMY0LiQ2dU= -github.com/mdlayher/genetlink v0.0.0-20170901181924-76fecce4c787/go.mod h1:EOrmeik1bDMaRduo2B+uAYe1HmTq6yF2IMDmJi1GoWk= -github.com/mdlayher/genetlink v1.0.0 h1:OoHN1OdyEIkScEmRgxLEe2M9U8ClMytqA5niynLtfj0= -github.com/mdlayher/genetlink v1.0.0/go.mod h1:0rJ0h4itni50A86M2kHcgS85ttZazNt7a8H2a2cw0Gc= -github.com/mdlayher/netlink v0.0.0-20180326144912-dc216978b479 h1:MF+m/B1wWGiOBY92ORRiv6hGcRBX4KHqNoYIO+y2Owo= -github.com/mdlayher/netlink v0.0.0-20180326144912-dc216978b479/go.mod h1:a3TlQHkJH2m32RF224Z7LhD5N4mpyR8eUbCoYHywrwg= -github.com/mdlayher/netlink v0.0.0-20190409211403-11939a169225/go.mod h1:eQB3mZE4aiYnlUsyGGCOpPETfdQq4Jhsgf1fk3cwQaA= -github.com/mdlayher/netlink v1.0.0/go.mod h1:KxeJAFOFLG6AjpyDkQ/iIhxygIUKD+vcwqcnu43w/+M= -github.com/mdlayher/netlink v1.1.0/go.mod h1:H4WCitaheIsdF9yOYu8CFmCgQthAPIWZmcKp9uZHgmY= -github.com/mdlayher/netlink v1.1.1/go.mod h1:WTYpFb/WTvlRJAyKhZL5/uy69TDDpHHu2VZmb2XgV7o= -github.com/mdlayher/netlink v1.2.0/go.mod h1:kwVW1io0AZy9A1E2YYgaD4Cj+C+GPkU6klXCMzIJ9p8= -github.com/mdlayher/netlink v1.2.1/go.mod h1:bacnNlfhqHqqLo4WsYeXSqfyXkInQ9JneWI68v1KwSU= -github.com/mdlayher/netlink v1.2.2-0.20210123213345-5cc92139ae3e/go.mod h1:bacnNlfhqHqqLo4WsYeXSqfyXkInQ9JneWI68v1KwSU= -github.com/mdlayher/netlink v1.3.0/go.mod h1:xK/BssKuwcRXHrtN04UBkwQ6dY9VviGGuriDdoPSWys= -github.com/mdlayher/netlink v1.3.2 h1:fMZOU2/M7PRMzGM3br5l1N2fu6bPSHtRytmQ338a9iA= -github.com/mdlayher/netlink v1.3.2/go.mod h1:dRJi5IABcZpBD2A3D0Mv/AiX8I9uDEu5oGkAVrekmf8= +github.com/mdlayher/genetlink v1.3.2 h1:KdrNKe+CTu+IbZnm/GVUMXSqBBLqcGpRDa0xkQy56gw= +github.com/mdlayher/genetlink v1.3.2/go.mod h1:tcC3pkCrPUGIKKsCsp0B3AdaaKuHtaxoJRz3cc+528o= +github.com/mdlayher/netlink v1.8.0 h1:e7XNIYJKD7hUct3Px04RuIGJbBxy1/c4nX7D5YyvvlM= +github.com/mdlayher/netlink v1.8.0/go.mod h1:UhgKXUlDQhzb09DrCl2GuRNEglHmhYoWAHid9HK3594= +github.com/mdlayher/socket v0.5.1 h1:VZaqt6RkGkt2OE9l3GcC6nZkqD3xKeQLyfleW/uBcos= +github.com/mdlayher/socket v0.5.1/go.mod h1:TjPLHI1UgwEv5J1B5q0zTZq12A/6H7nKmtTanQE37IQ= github.com/miekg/dns v1.0.14/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg= github.com/mitchellh/cli v1.0.0/go.mod h1:hNIlj7HEI86fIcpObd7a0FcrxTWetlwJDGcceTlRvqc= github.com/mitchellh/go-homedir v1.0.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0= @@ -234,12 +206,12 @@ github.com/pierrec/lz4 v1.0.2-0.20190131084431-473cd7ce01a1/go.mod h1:3/3N9NVKO0 github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/profile v1.2.1/go.mod h1:hJw3o1OdXxsrSjjVksARp5W95eeEaEfptyVZyv6JUPA= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/posener/complete v1.1.1/go.mod h1:em0nMJCgc9GFtwrmVmEMR/ZL6WyhyjMBndrE9hABlRI= -github.com/prometheus/client_golang v0.9.0-pre1.0.20171005112915-5cec1d0429b0 h1:eIVGl4K1clOaKdGaS+KSUEOwF+g2g2aIEsmikqXqRgY= -github.com/prometheus/client_golang v0.9.0-pre1.0.20171005112915-5cec1d0429b0/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw= github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw= github.com/prometheus/client_golang v0.9.3-0.20190127221311-3c4408c8b829/go.mod h1:p2iRAGwDERtqlqzRXnrOVns+ignqQo//hLXqYxZYVNs= github.com/prometheus/client_golang v1.0.0/go.mod h1:db9x61etRT2tGnBNRi70OPL5FsnadC4Ky3P0J6CfImo= @@ -247,8 +219,6 @@ github.com/prometheus/client_golang v1.3.0/go.mod h1:hJaj2vgQTGQmVCsAACORcieXFeD github.com/prometheus/client_golang v1.7.1/go.mod h1:PY5Wy2awLA44sXw4AOSfFBetzPP4j5+D6mVACh+pe2M= github.com/prometheus/client_golang v1.9.0 h1:Rrch9mh17XcxvEu9D9DEpb4isxjGBtcevQjKvxPRQIU= github.com/prometheus/client_golang v1.9.0/go.mod h1:FqZLKOZnGdFAhOK4nqGHa7D66IdsO+O441Eve7ptJDU= -github.com/prometheus/client_model v0.0.0-20170216185247-6f3806018612 h1:13pIdM2tpaDi4OVe24fgoIS7ZTqMt0QI+bwQsX5hq+g= -github.com/prometheus/client_model v0.0.0-20170216185247-6f3806018612/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo= github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo= github.com/prometheus/client_model v0.0.0-20190115171406-56726106282f/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo= github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= @@ -256,25 +226,19 @@ github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1: github.com/prometheus/client_model v0.1.0/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= github.com/prometheus/client_model v0.2.0 h1:uq5h0d+GuxiXLJLNABMgp2qUWDPiLvgCzz2dUR+/W/M= github.com/prometheus/client_model v0.2.0/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= -github.com/prometheus/common v0.0.0-20171006141418-1bab55dd05db h1:PmL7nSW2mvuotGlJKuvUcSI/eE86zwYUcIAGoB6eHBk= -github.com/prometheus/common v0.0.0-20171006141418-1bab55dd05db/go.mod h1:daVV7qP5qjZbuso7PdcryaAu0sAZbrN9i7WWcTMWvro= github.com/prometheus/common v0.2.0/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4= github.com/prometheus/common v0.4.1/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4= github.com/prometheus/common v0.7.0/go.mod h1:DjGbpBbp5NYNiECxcL/VnbXCCaQpKd3tt26CguLLsqA= github.com/prometheus/common v0.10.0/go.mod h1:Tlit/dnDKsSWFlCLTWaA1cyBgKHSMdTB80sz/V91rCo= +github.com/prometheus/common v0.15.0 h1:4fgOnadei3EZvgRwxJ7RMpG1k1pOZth5Pc13tyspaKM= github.com/prometheus/common v0.15.0/go.mod h1:U+gB1OBLb1lF3O42bTCL+FK18tX9Oar16Clt/msog/s= -github.com/prometheus/common v0.17.0 h1:kDIZLI74SS+3tedSvEkykgBkD7txMxaJAPj8DtJUKYA= -github.com/prometheus/common v0.17.0/go.mod h1:U+gB1OBLb1lF3O42bTCL+FK18tX9Oar16Clt/msog/s= -github.com/prometheus/procfs v0.0.0-20171226183907-b15cd069a834 h1:HRxr4uZnx/S86wVQsfXcKhadpzdceXn2qCzCtagcI6w= -github.com/prometheus/procfs v0.0.0-20171226183907-b15cd069a834/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= github.com/prometheus/procfs v0.0.0-20190117184657-bf6a532e95b1/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA= github.com/prometheus/procfs v0.0.8/go.mod h1:7Qr8sr6344vo1JqZ6HhLceV9o3AJ1Ff+GxbHq6oeK9A= github.com/prometheus/procfs v0.1.3/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4OA4YeYWdaU= +github.com/prometheus/procfs v0.2.0 h1:wH4vA7pcjKuZzjF7lM8awk4fnuJO6idemZXoKnULUx4= github.com/prometheus/procfs v0.2.0/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4OA4YeYWdaU= -github.com/prometheus/procfs v0.6.0 h1:mxy4L2jP6qMonqmq+aTtOx1ifVWUgG/TAmntgbh3xv4= -github.com/prometheus/procfs v0.6.0/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA= github.com/prometheus/prometheus v2.2.1-0.20180315085919-58e2a31db8de+incompatible h1:jgW1I0kWFlDOqNLlYBcxVfpRGSOL3n6lXn1BykdEG30= github.com/prometheus/prometheus v2.2.1-0.20180315085919-58e2a31db8de+incompatible/go.mod h1:oAIUtOny2rjMX0OWN5vPR5/q/twIROJvdqnQKDdil/s= github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= @@ -302,9 +266,17 @@ github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+ github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= +github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= +github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= +github.com/ti-mo/conntrack v0.6.0 h1:laiW2+dzKyS2u0aVr6FeRQs+v7cj4t7q+twolL/ZkjQ= +github.com/ti-mo/conntrack v0.6.0/go.mod h1:4HZrFQQLOSuBzgQNid3H/wYyyp1kfGXUYxueXjIGibo= +github.com/ti-mo/netfilter v0.5.3 h1:ikzduvnaUMwre5bhbNwWOd6bjqLMVb33vv0XXbK0xGQ= +github.com/ti-mo/netfilter v0.5.3/go.mod h1:08SyBCg6hu1qyQk4s3DjjJKNrm3RTb32nm6AzyT972E= github.com/tmc/grpc-websocket-proxy v0.0.0-20170815181823-89b8d40f7ca8/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U= github.com/urfave/cli v1.20.0/go.mod h1:70zkFmudgCuE/ngEzBv17Jvp/497gISqfk5gWijbERA= github.com/urfave/cli v1.22.1/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0= +github.com/vishvananda/netns v0.0.4 h1:Oeaw1EM2JMxD51g9uhtC0D7erkIjgmj8+JZc26m1YX8= +github.com/vishvananda/netns v0.0.4/go.mod h1:SpkAiCQRtJ6TvvxPnOSyH3BMl6unz3xZlaprSwhNNJM= github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU= go.etcd.io/bbolt v1.3.3/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU= go.etcd.io/etcd v0.0.0-20191023171146-3cf2f69b5738/go.mod h1:dnLIgRNXwCJa5e+c6mIZCrds/GIG4ncV9HhK5PX7jPg= @@ -333,8 +305,6 @@ golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHl golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= golang.org/x/mod v0.0.0-20190513183733-4bf6d317e70e/go.mod h1:mXi4GBBbnImb6dmsKGUJ2LatrhH/nqhxcFungHvyanc= golang.org/x/mod v0.1.1-0.20191105210325-c90efee705ee/go.mod h1:QqPTAvyqsEbceGzBzNggFXnrqF1CaUcvgkdR5Ot7KZg= -golang.org/x/net v0.0.0-20170614204310-ddf80d097059 h1:gMF+Wxxy27FCUvSZhKB22yNezu60IyLC37MHpj45QXs= -golang.org/x/net v0.0.0-20170614204310-ddf80d097059/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= @@ -351,17 +321,9 @@ golang.org/x/net v0.0.0-20190603091049-60506f45cf65/go.mod h1:HSz+uSET+XFnRR8LxR golang.org/x/net v0.0.0-20190613194153-d28f0bde5980/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20190813141303-74dc4d7220e7/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= -golang.org/x/net v0.0.0-20190827160401-ba9fcec4b297/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= -golang.org/x/net v0.0.0-20191007182048-72f939374954/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= -golang.org/x/net v0.0.0-20200202094626-16171245cfb2/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200625001655-4c5254603344/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= -golang.org/x/net v0.0.0-20201010224723-4f7140c49acb/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= -golang.org/x/net v0.0.0-20201110031124-69a78807bb2b/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= -golang.org/x/net v0.0.0-20201216054612-986b41b23924/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= -golang.org/x/net v0.0.0-20201224014010-6772e930b67b/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= -golang.org/x/net v0.0.0-20210119194325-5f4716e94777/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= -golang.org/x/net v0.0.0-20210222171744-9060382bd457 h1:hMm9lBjyNLe/c9C6bElQxp4wsrleaJn1vXMZIQkNN44= -golang.org/x/net v0.0.0-20210222171744-9060382bd457/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= +golang.org/x/net v0.46.0 h1:giFlY12I07fugqwPuWJi68oOnpfqFnJIJzaIIm2JVV4= +golang.org/x/net v0.46.0/go.mod h1:Q9BGdFy1y4nkUwiLvT5qtyhAnEHgnQ/zd8PfU6nc210= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -370,11 +332,8 @@ golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20190227155943-e225da77a7e6/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.0.0-20210220032951-036812b2e83c h1:5KslGYwFpkhGh+Q16bwMP3cOontH8FOep7tGV86Y7SQ= -golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sys v0.0.0-20180420145319-79b0c6888797 h1:ux9vYny+vlzqIcwoO6gRu+voPvKJA10ZceuJwWf2J88= -golang.org/x/sys v0.0.0-20180420145319-79b0c6888797/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sync v0.17.0 h1:l60nONMj9l5drqw6jlhIELNv9I0A4OFgRsG9k2oT9Ug= +golang.org/x/sync v0.17.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI= golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -384,36 +343,21 @@ golang.org/x/sys v0.0.0-20181107165924-66b7b1311ac8/go.mod h1:STP8DvDyc/dI5b8T5h golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20181122145206-62eef0e2fa9b/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= -golang.org/x/sys v0.0.0-20190312061237-fead79001313/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20190411185658-b44545bcd369/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190502145724-3ef323f4f1fd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190726091711-fc99dfbffb4e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190826190057-c7b8b68b1456/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20191008105621-543471e840be/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191220142924-d4481acd189f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200106162015-b016eb3dc98e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20200202164722-d101bd2416d5/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200615200032-f1bc736245b1/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200625212154-ddb9806d33ae/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20201009025420-dfb3f7c4e634/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20201118182958-a01c418693c7/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201214210602-f9fddec55a1e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20201218084310-7d0127a74742/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20210110051926-789bb1bd4061/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20210123111255-9b0068b26619/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20210216163648-f7da38b97c65/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20210223095934-7937bea0104d h1:u0GOGnBJ3EKE/tNqREhhGiCzE9jFXydDo2lf7hOwGuc= -golang.org/x/sys v0.0.0-20210223095934-7937bea0104d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= +golang.org/x/sys v0.37.0 h1:fdNQudmxPjkdUTPnLn5mdQv7Zwvbvpaxqs831goi9kQ= +golang.org/x/sys v0.37.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= -golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/tools v0.0.0-20180221164845-07fd8470d635/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= @@ -431,9 +375,7 @@ golang.org/x/tools v0.0.0-20191029190741-b9c20aec41a5/go.mod h1:b+2E5dAYhXwXZwtn golang.org/x/tools v0.0.0-20200103221440-774c71fcf114/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= google.golang.org/api v0.3.1/go.mod h1:6wY9I6uQWHQ8EM57III9mq/AjF+i8G65rmVagqKMtkk= google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= google.golang.org/appengine v1.2.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= @@ -443,7 +385,6 @@ google.golang.org/genproto v0.0.0-20190307195333-5fe7a883aa19/go.mod h1:VzzqZJRn google.golang.org/genproto v0.0.0-20190425155659-357c62f0e4bb/go.mod h1:VzzqZJRnGkLBvHegQrXjBqPurQTc5/KpmUdxsrq26oE= google.golang.org/genproto v0.0.0-20190530194941-fb225487d101/go.mod h1:z3L6/3dTEVtUr6QSP8miRzeRqwQOioJ9I66odjN4I7s= google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc= -google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEYHJ7i3ixzK3sjbqSGDJWnxyFXZblF3eUsNvo= google.golang.org/grpc v1.17.0/go.mod h1:6QZJwpn2B+Zp71q/5VxRsJ6NXXVCE5NRUHRo+f3cWCs= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= google.golang.org/grpc v1.20.0/go.mod h1:chYK+tFQF0nDUGJgXMSgLCQk3phJEuONr2DCgLDdAQM= @@ -453,18 +394,13 @@ google.golang.org/grpc v1.22.1/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyac google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg= google.golang.org/grpc v1.23.1/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg= google.golang.org/grpc v1.26.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk= -google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miEFZTKqfCUM6K7xSMQL9OKL/b6hQv+e19PK+JZNE= google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo= -google.golang.org/protobuf v1.22.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= google.golang.org/protobuf v1.23.0 h1:4MY060fB1DLGMB/7MBTLnwQUY6+F09GEiz6SsrNqyzM= google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= -google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= -google.golang.org/protobuf v1.25.0 h1:Ejskq+SyPohKW+1uil0JJMtmHCgJPJ/qWTxr8qp+R4c= -google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c= gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= @@ -482,6 +418,8 @@ gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.5/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= honnef.co/go/tools v0.0.0-20180728063816-88497007e858/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= diff --git a/internal/conntrack/aggregator.go b/internal/conntrack/aggregator.go new file mode 100644 index 0000000..4917c0d --- /dev/null +++ b/internal/conntrack/aggregator.go @@ -0,0 +1,71 @@ +// Copyright 2017 DigitalOcean. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package conntrack + +import ( + "context" + "sync" + "sync/atomic" + "time" + + "github.com/ti-mo/conntrack" + "golang.org/x/sync/errgroup" +) + +// ZoneMarkAggregator keeps live counts (zmKey -> count) with bounded ingestion +type ZoneMarkAggregator struct { + // Configuration + config *Config + + // primary counts (zmKey -> count) - simplified flat mapping + counts map[ZoneMarkKey]int + countsMu sync.RWMutex + eventRate float64 + + // conntrack listening connection + listenCli *conntrack.Conn + listenerMu sync.Mutex // Protects listener restart operations + + // lifecycle + ctx context.Context + cancel context.CancelFunc + wg errgroup.Group + + // bounded event ingestion + eventsCh chan conntrack.Event + + // aggregated DESTROY deltas (bounded by destroyDeltaCap) + deltaMu sync.Mutex + destroyDeltas map[ZoneMarkKey]int + + // metrics / health + eventCount atomic.Int64 + lastEventTime time.Time + missedEvents atomic.Int64 + lastHealthCheck time.Time +} + +// ZoneMarkKey is a compact key for (zone,mark) +type ZoneMarkKey struct { + Zone uint16 + Mark uint32 +} + +// MarkZoneAggregator interface defines the methods needed by the collector +type MarkZoneAggregator interface { + Snapshot() map[ZoneMarkKey]int + Stop() error + Start() error +} diff --git a/internal/conntrack/aggregator_linux.go b/internal/conntrack/aggregator_linux.go new file mode 100644 index 0000000..ddb7a2a --- /dev/null +++ b/internal/conntrack/aggregator_linux.go @@ -0,0 +1,423 @@ +// Copyright 2017 DigitalOcean. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build linux + +package conntrack + +import ( + "context" + "fmt" + "log" + "time" + + "github.com/ti-mo/conntrack" + "github.com/ti-mo/netfilter" +) + +// Compile-time assertion that *ZoneMarkAggregator implements MarkZoneAggregator +var _ MarkZoneAggregator = (*ZoneMarkAggregator)(nil) + +// +// Conntrack aggregator with bounded ingestion + DESTROY aggregation +// to handle massive bursts of conntrack DESTROY events without OOMing. +// + +// NewZoneMarkAggregator creates a new aggregator with its own listening connection. +func NewZoneMarkAggregator() (MarkZoneAggregator, error) { + return NewZoneMarkAggregatorWithConfig(DefaultConfig()) +} + +// NewZoneMarkAggregatorWithConfig creates a new aggregator with custom configuration. +func NewZoneMarkAggregatorWithConfig(config *Config) (*ZoneMarkAggregator, error) { + // Create a separate connection for listening to events + listenCli, err := conntrack.Dial(nil) + if err != nil { + return nil, fmt.Errorf("failed to create listening connection: %w", err) + } + + if err := listenCli.SetReadBuffer(config.ReadBufferSize); err != nil { + log.Printf("Warning: Failed to set read buffer size: %v", err) + } + if err := listenCli.SetWriteBuffer(config.WriteBufferSize); err != nil { + log.Printf("Warning: Failed to set write buffer size: %v", err) + } + + ctx, cancel := context.WithCancel(context.Background()) + a := &ZoneMarkAggregator{ + config: config, + counts: make(map[ZoneMarkKey]int), + listenCli: listenCli, + ctx: ctx, + cancel: cancel, + eventsCh: make(chan conntrack.Event, config.EventChanSize), + destroyDeltas: make(map[ZoneMarkKey]int), + lastEventTime: time.Now(), + lastHealthCheck: time.Now(), + } + + return a, nil +} + +// Start subscribes to NEW/DESTROY/UPDATE events and maintains counts with bounded ingestion. +func (a *ZoneMarkAggregator) Start() error { + + if err := a.startEventListener(); err != nil { + return err + } + + for i := 0; i < a.config.EventWorkerCount; i++ { + a.wg.Go(func() error { + return a.eventWorker(a.ctx) + }) + } + + a.wg.Go(func() error { + return a.destroyFlusher(a.ctx) + }) + + a.wg.Go(func() error { + return a.startHealthMonitoring(a.ctx) + }) + + return nil +} + +// startEventListener handles real-time conntrack events, pushing into bounded eventsCh. +func (a *ZoneMarkAggregator) startEventListener() error { + libEvents := make(chan conntrack.Event, 8192) + groups := []netfilter.NetlinkGroup{ + netfilter.GroupCTNew, + netfilter.GroupCTDestroy, + netfilter.GroupCTUpdate, + } + + errCh, err := a.listenCli.Listen(libEvents, 50, groups) + if err != nil { + return fmt.Errorf("failed to listen to conntrack events: %w", err) + } + + a.wg.Go(func() error { + eventCount := int64(0) + rateWindow := make([]time.Time, 0, 100) + + for { + select { + case <-a.ctx.Done(): + log.Printf("Stopping lib->bounded relay after %d lib events", eventCount) + return a.ctx.Err() + case e := <-errCh: + if e != nil { + log.Printf("conntrack listener error: %v", e) + a.missedEvents.Add(1) + } + case ev := <-libEvents: + select { + case a.eventsCh <- ev: + eventCount++ + a.eventCount.Store(eventCount) + a.lastEventTime = time.Now() + + rateWindow = append(rateWindow, a.lastEventTime) + if len(rateWindow) > 100 { + rateWindow = rateWindow[1:] + } + if len(rateWindow) > 1 { + duration := rateWindow[len(rateWindow)-1].Sub(rateWindow[0]) + if duration > 0 { + a.eventRate = float64(len(rateWindow)-1) / duration.Seconds() + } + } + default: + a.missedEvents.Add(1) + if a.missedEvents.Load()%100 == 0 { + log.Printf("Warning: eventsCh full, missedEvents=%d", a.missedEvents.Load()) + } + } + } + } + }) + + return nil +} + +// eventWorker consumes events from eventsCh and handles them +func (a *ZoneMarkAggregator) eventWorker(ctx context.Context) error { + for { + select { + case <-ctx.Done(): + return ctx.Err() + case ev := <-a.eventsCh: + if err := a.handleEvent(ev); err != nil { + log.Printf("Error handling event: %v", err) + // Continue processing other events, but log the error + } + } + } +} + +// handleEvent processes a single event. +func (a *ZoneMarkAggregator) handleEvent(ev conntrack.Event) error { + f := ev.Flow + key := ZoneMarkKey{Zone: f.Zone, Mark: f.Mark} + + if ev.Type == conntrack.EventNew { + a.countsMu.Lock() + defer a.countsMu.Unlock() + a.counts[key]++ + return nil + } + + if ev.Type == conntrack.EventDestroy { + a.deltaMu.Lock() + defer a.deltaMu.Unlock() + if len(a.destroyDeltas) < a.config.DestroyDeltaCap { + a.destroyDeltas[key]++ + if len(a.destroyDeltas) > 50000 { // If we have >50K deltas, flush immediately + deltas := a.destroyDeltas + a.destroyDeltas = make(map[ZoneMarkKey]int) + // Acquire countsMu while still holding deltaMu to maintain lock ordering + a.countsMu.Lock() + defer a.countsMu.Unlock() + // Apply deltas immediately to minimize lag during extreme load + a.applyDeltasImmediatelyUnsafe(deltas) + return nil + } + // Log every 1000 DESTROY events to verify they're being received + if len(a.destroyDeltas)%1000 == 0 { + log.Printf("DESTROY events: %d entries in destroyDeltas (zone=%d, mark=%d)", len(a.destroyDeltas), key.Zone, key.Mark) + } + } else { + a.missedEvents.Add(1) + if a.missedEvents.Load()%a.config.DropsWarnThreshold == 0 { + log.Printf("Warning: destroyDeltas saturated (size=%d). missedEvents=%d", len(a.destroyDeltas), a.missedEvents.Load()) + } + } + return nil + } + + return nil +} + +// applyDeltasImmediatelyUnsafe applies deltas immediately to minimize lag during extreme load +// This method assumes countsMu is already held by the caller +func (a *ZoneMarkAggregator) applyDeltasImmediatelyUnsafe(deltas map[ZoneMarkKey]int) { + for k, cnt := range deltas { + existing, ok := a.counts[k] + if !ok { + a.missedEvents.Add(int64(cnt)) + continue + } + if existing <= cnt { + delete(a.counts, k) + } else { + a.counts[k] = existing - cnt + } + } +} + +// destroyFlusher periodically applies the aggregated DESTROY deltas into counts +// Uses adaptive flushing: more frequent during high event rates for minimal lag +func (a *ZoneMarkAggregator) destroyFlusher(ctx context.Context) error { + ticker := time.NewTicker(a.config.DestroyFlushIntvl) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + log.Printf("Destroy flusher stopping, final flush...") + a.flushDestroyDeltas() + return ctx.Err() + case <-ticker.C: + // Adaptive flushing: flush more frequently during high event rates + a.countsMu.RLock() + eventRate := a.eventRate + a.countsMu.RUnlock() + + if eventRate > 500000 { // Very high event rate (>500K/sec) + // Flush immediately and reset ticker for faster interval + a.flushDestroyDeltas() + ticker.Reset(50 * time.Millisecond) // 50ms during extreme load + } else if eventRate > 100000 { // High event rate (>100K/sec) + a.flushDestroyDeltas() + ticker.Reset(100 * time.Millisecond) // 100ms during high load + } else if eventRate > 10000 { // Medium event rate (>10K/sec) + a.flushDestroyDeltas() + ticker.Reset(200 * time.Millisecond) // 200ms during medium load + } else { + // Normal flush + a.flushDestroyDeltas() + ticker.Reset(a.config.DestroyFlushIntvl) // Back to normal interval + } + } + } +} + +// flushDestroyDeltas atomically swaps the delta map and applies decrements +func (a *ZoneMarkAggregator) flushDestroyDeltas() { + // First acquire deltaMu to check and swap deltas + a.deltaMu.Lock() + defer a.deltaMu.Unlock() + if len(a.destroyDeltas) == 0 { + return + } + deltas := a.destroyDeltas + a.destroyDeltas = make(map[ZoneMarkKey]int) + + // Now acquire countsMu while still holding deltaMu to ensure atomicity + a.countsMu.Lock() + defer a.countsMu.Unlock() + + for k, cnt := range deltas { + existing, ok := a.counts[k] + if !ok { + a.missedEvents.Add(int64(cnt)) + continue + } + if existing <= cnt { + delete(a.counts, k) + } else { + a.counts[k] = existing - cnt + } + } +} + +// Snapshot returns a safe copy of counts. +func (a *ZoneMarkAggregator) Snapshot() map[ZoneMarkKey]int { + a.flushDestroyDeltas() + a.countsMu.RLock() + defer a.countsMu.RUnlock() + + out := make(map[ZoneMarkKey]int, len(a.counts)) + for k, c := range a.counts { + if c > 0 { + out[k] = c + } + } + return out +} + +// startHealthMonitoring periodically logs aggregator health +func (a *ZoneMarkAggregator) startHealthMonitoring(ctx context.Context) error { + ticker := time.NewTicker(a.config.HealthCheckIntvl) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return ctx.Err() + case <-ticker.C: + if err := a.performHealthCheck(); err != nil { + log.Printf("Health check error: %v", err) + // Continue monitoring even if health check fails + } + } + } +} + +func (a *ZoneMarkAggregator) performHealthCheck() error { + missed := a.missedEvents.Load() + + if missed > a.config.DropsWarnThreshold { + if err := a.RestartListener(); err != nil { + log.Printf("Health check: RestartListener failed: %v", err) + return fmt.Errorf("failed to restart listener: %w", err) + } + a.missedEvents.Store(0) + log.Printf("Health check: Listener restarted successfully") + } + a.lastHealthCheck = time.Now() + return nil +} + +// Stop cancels listening and closes the connection with graceful shutdown. +func (a *ZoneMarkAggregator) Stop() error { + return a.stopWithTimeout(a.config.GracefulTimeout) +} + +// StopWithTimeout cancels listening and closes the connection with a configurable timeout. +func (a *ZoneMarkAggregator) stopWithTimeout(timeout time.Duration) error { + // Signal shutdown to all goroutines + a.cancel() + + // Create a context with timeout for graceful shutdown + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + + // Channel to receive shutdown completion + done := make(chan error, 1) + + // Wait for goroutines to exit in a separate goroutine + go func() { + done <- a.wg.Wait() + }() + + // Wait for either completion or timeout + select { + case err := <-done: + if err != nil { + log.Printf("Error from goroutine group during shutdown: %v", err) + // Continue with cleanup even if there were errors + } + case <-ctx.Done(): + log.Printf("Graceful shutdown timeout exceeded (%v), forcing cleanup", timeout) + // Force close connections even if goroutines didn't exit cleanly + } + + // Close the listening connection + if a.listenCli != nil { + if err := a.listenCli.Close(); err != nil { + log.Printf("Error closing listenCli during cleanup: %v", err) + } + a.listenCli = nil + } + + // Final flush of any remaining deltas + a.flushDestroyDeltas() + + log.Printf("MarkZoneAggregator stopped gracefully") + return nil +} + +// RestartListener attempts to restart the conntrack event listener +func (a *ZoneMarkAggregator) RestartListener() error { + a.listenerMu.Lock() + defer a.listenerMu.Unlock() + + // Signal all goroutines to stop by canceling the context + a.cancel() + + // Close the old connection to help goroutines exit faster + if a.listenCli != nil { + if err := a.listenCli.Close(); err != nil { + log.Printf("Warning: Error closing old listener connection: %v", err) + } + } + + // Wait for all goroutines to exit cleanly + a.wg.Wait() + + // Create a new context for the restarted goroutines + a.ctx, a.cancel = context.WithCancel(context.Background()) + + // Create new connection + listenCli, err := conntrack.Dial(nil) + if err != nil { + return fmt.Errorf("failed to create new listening connection: %w", err) + } + a.listenCli = listenCli + + // Start new listener with fresh goroutines + return a.startEventListener() +} diff --git a/internal/conntrack/aggregator_linux_test.go b/internal/conntrack/aggregator_linux_test.go new file mode 100644 index 0000000..8f7132b --- /dev/null +++ b/internal/conntrack/aggregator_linux_test.go @@ -0,0 +1,273 @@ +// Copyright 2017 DigitalOcean. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build linux + +package conntrack + +import ( + "sync" + "testing" + "time" +) + +func TestZoneMarkAggregator(t *testing.T) { + tests := []struct { + name string + setup func() (MarkZoneAggregator, error) + operations []func(MarkZoneAggregator) error + validate func(*testing.T, MarkZoneAggregator) + wantErr bool + skipOnError bool + }{ + { + name: "successful_creation", + setup: func() (MarkZoneAggregator, error) { return NewZoneMarkAggregator() }, + operations: []func(MarkZoneAggregator) error{ + func(agg MarkZoneAggregator) error { return agg.Start() }, + }, + validate: func(t *testing.T, agg MarkZoneAggregator) { + if agg == nil { + t.Fatal("expected non-nil aggregator") + } + _ = agg.Snapshot() // tolerate empty in CI + }, + wantErr: false, + skipOnError: true, + }, + { + name: "snapshot_functionality", + setup: func() (MarkZoneAggregator, error) { return NewZoneMarkAggregator() }, + operations: []func(MarkZoneAggregator) error{ + func(agg MarkZoneAggregator) error { return agg.Start() }, + }, + validate: func(t *testing.T, agg MarkZoneAggregator) { + snap := agg.Snapshot() + if snap == nil { + t.Skip("nil snapshot likely due to permissions") + } + for key, count := range snap { + if count <= 0 { + t.Errorf("Invalid count %d for key %+v", count, key) + } + } + }, + wantErr: false, + skipOnError: true, + }, + { + name: "start_stop_lifecycle", + setup: func() (MarkZoneAggregator, error) { return NewZoneMarkAggregator() }, + operations: []func(MarkZoneAggregator) error{ + func(agg MarkZoneAggregator) error { return agg.Start() }, + func(agg MarkZoneAggregator) error { + time.Sleep(10 * time.Millisecond) + return agg.Stop() + }, + }, + validate: func(t *testing.T, agg MarkZoneAggregator) { + if err := agg.Stop(); err != nil { + t.Errorf("Stop() error: %v", err) + } + _ = agg.Snapshot() + }, + wantErr: false, + skipOnError: true, + }, + { + name: "concurrent_snapshot_access", + setup: func() (MarkZoneAggregator, error) { return NewZoneMarkAggregator() }, + operations: []func(MarkZoneAggregator) error{ + func(agg MarkZoneAggregator) error { return agg.Start() }, + }, + validate: func(t *testing.T, agg MarkZoneAggregator) { + var wg sync.WaitGroup + wg.Add(10) + for i := 0; i < 10; i++ { + go func() { + defer wg.Done() + _ = agg.Snapshot() + }() + } + wg.Wait() + }, + wantErr: false, + skipOnError: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + agg, err := tt.setup() + if (err != nil) != tt.wantErr { + if tt.skipOnError { + t.Logf("Skipping test due to expected failure: %v", err) + return + } + t.Errorf("setup error = %v, wantErr %v", err, tt.wantErr) + return + } + if agg == nil { + if tt.skipOnError { + t.Skip("Expected failure in test environment") + return + } + t.Fatal("NewZoneMarkAggregator() returned nil aggregator") + } + + t.Cleanup(func() { agg.Stop() }) + + for i, op := range tt.operations { + if err := op(agg); err != nil { + if tt.skipOnError { + t.Logf("Skipping test due to operation %d failure: %v", i, err) + return + } + t.Errorf("operation %d failed: %v", i, err) + return + } + } + + if tt.validate != nil { + tt.validate(t, agg) + } + }) + } +} + +func TestZoneMarkKey(t *testing.T) { + tests := []struct { + name string + key1 ZoneMarkKey + key2 ZoneMarkKey + expected bool + desc string + }{ + {name: "identical_keys", key1: ZoneMarkKey{Zone: 1, Mark: 100}, key2: ZoneMarkKey{Zone: 1, Mark: 100}, expected: true, desc: "Identical ZoneMarkKey structs should be equal"}, + {name: "different_zone", key1: ZoneMarkKey{Zone: 1, Mark: 100}, key2: ZoneMarkKey{Zone: 2, Mark: 100}, expected: false, desc: "Different zone ZoneMarkKey structs should not be equal"}, + {name: "different_mark", key1: ZoneMarkKey{Zone: 1, Mark: 100}, key2: ZoneMarkKey{Zone: 1, Mark: 200}, expected: false, desc: "Different mark ZoneMarkKey structs should not be equal"}, + {name: "both_different", key1: ZoneMarkKey{Zone: 1, Mark: 100}, key2: ZoneMarkKey{Zone: 2, Mark: 200}, expected: false, desc: "Both zone and mark different should not be equal"}, + {name: "zero_values", key1: ZoneMarkKey{Zone: 0, Mark: 0}, key2: ZoneMarkKey{Zone: 0, Mark: 0}, expected: true, desc: "Zero values should be equal"}, + {name: "max_values", key1: ZoneMarkKey{Zone: 65535, Mark: 4294967295}, key2: ZoneMarkKey{Zone: 65535, Mark: 4294967295}, expected: true, desc: "Max values should be equal"}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := (tt.key1 == tt.key2) + if result != tt.expected { + t.Errorf("%s: got %v, want %v", tt.desc, result, tt.expected) + } + }) + } +} + +func TestZoneMarkKeyAsMapKey(t *testing.T) { + tests := []struct { + name string + keys []ZoneMarkKey + values []int + lookup ZoneMarkKey + expected int + desc string + }{ + {name: "basic_map_operations", keys: []ZoneMarkKey{{Zone: 1, Mark: 100}, {Zone: 2, Mark: 200}}, values: []int{5, 10}, lookup: ZoneMarkKey{Zone: 1, Mark: 100}, expected: 5, desc: "ZoneMarkKey should work as map key"}, + {name: "equal_keys_map_to_same_value", keys: []ZoneMarkKey{{Zone: 1, Mark: 100}, {Zone: 2, Mark: 200}}, values: []int{5, 10}, lookup: ZoneMarkKey{Zone: 1, Mark: 100}, expected: 5, desc: "Equal ZoneMarkKey structs should map to same value"}, + {name: "different_keys_map_to_different_values", keys: []ZoneMarkKey{{Zone: 1, Mark: 100}, {Zone: 2, Mark: 200}}, values: []int{5, 10}, lookup: ZoneMarkKey{Zone: 2, Mark: 200}, expected: 10, desc: "Different ZoneMarkKey should map to different value"}, + {name: "zero_key_operations", keys: []ZoneMarkKey{{Zone: 0, Mark: 0}, {Zone: 1, Mark: 1}}, values: []int{100, 200}, lookup: ZoneMarkKey{Zone: 0, Mark: 0}, expected: 100, desc: "Zero value keys should work correctly"}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + testMap := make(map[ZoneMarkKey]int) + for i, key := range tt.keys { + testMap[key] = tt.values[i] + } + result := testMap[tt.lookup] + if result != tt.expected { + t.Errorf("%s: got %d, want %d", tt.desc, result, tt.expected) + } + }) + } +} + +func TestAggregatorLifecycle(t *testing.T) { + tests := []struct { + name string + operations []func(MarkZoneAggregator) error + validate func(*testing.T, MarkZoneAggregator) + wantErr bool + skipOnError bool + }{ + { + name: "start_twice_should_fail", + operations: []func(MarkZoneAggregator) error{ + func(agg MarkZoneAggregator) error { return agg.Start() }, + func(agg MarkZoneAggregator) error { return agg.Start() }, // second start + }, + validate: func(t *testing.T, agg MarkZoneAggregator) {}, + wantErr: false, + skipOnError: true, + }, + { + name: "stop_without_start", + operations: []func(MarkZoneAggregator) error{ + func(agg MarkZoneAggregator) error { return agg.Stop() }, + }, + validate: func(t *testing.T, agg MarkZoneAggregator) { _ = agg.Snapshot() }, + wantErr: false, + }, + { + name: "snapshot_after_stop", + operations: []func(MarkZoneAggregator) error{ + func(agg MarkZoneAggregator) error { return agg.Start() }, + func(agg MarkZoneAggregator) error { time.Sleep(10 * time.Millisecond); return agg.Stop() }, + }, + validate: func(t *testing.T, agg MarkZoneAggregator) { _ = agg.Snapshot() }, + wantErr: false, + skipOnError: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + agg, err := NewZoneMarkAggregator() + if err != nil { + if tt.skipOnError { + t.Logf("Skipping test due to expected failure: %v", err) + return + } + t.Fatalf("Failed to create aggregator: %v", err) + } + if agg == nil { + t.Fatal("NewZoneMarkAggregator() returned nil aggregator") + } + t.Cleanup(func() { agg.Stop() }) + for i, op := range tt.operations { + if err := op(agg); err != nil { + if (err != nil) != tt.wantErr { + if tt.skipOnError { + t.Logf("Skipping op %d failure: %v", i, err) + return + } + t.Errorf("operation %d error = %v, wantErr %v", i, err, tt.wantErr) + return + } + } + } + if tt.validate != nil { + tt.validate(t, agg) + } + }) + } +} diff --git a/internal/conntrack/config.go b/internal/conntrack/config.go new file mode 100644 index 0000000..2b186e5 --- /dev/null +++ b/internal/conntrack/config.go @@ -0,0 +1,109 @@ +// Copyright 2017 DigitalOcean. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package conntrack + +import ( + "time" +) + +// Config holds configuration for the conntrack aggregator. +// +// The conntrack aggregator uses a default configuration system. The exporter currently +// uses default values for all conntrack settings. Custom configuration is supported +// programmatically via this Config struct and NewZoneMarkAggregatorWithConfig() function, +// but is not currently exposed at the top-level exporter interface. +// +// To use custom configuration, you would need to modify the exporter code to pass a +// custom Config struct instead of using NewZoneMarkAggregator() which always uses defaults. +type Config struct { + // EventChanSize is the buffer size for the bounded event channel that receives + // conntrack events from the netlink listener (default: 524288 = 512KB). + // This channel acts as a buffer between the raw netlink events and the event + // workers. When full, events are dropped and missedEvents counter is incremented. + EventChanSize int + + // EventWorkerCount is the number of goroutines that process events from the + // bounded event channel (default: 100). Each worker + // processes NEW/DESTROY/UPDATE events. + EventWorkerCount int + + // DestroyFlushIntvl is the base interval for flushing aggregated DESTROY deltas + // into the main counts map (default: 50ms). The actual flushing is adaptive: + // - >500K events/sec: 50ms intervals + // - >100K events/sec: 100ms intervals + // - >10K events/sec: 200ms intervals + // - Normal: uses this configured interval + // Faster flushing reduces latency but uses more CPU. + DestroyFlushIntvl time.Duration + + // DestroyDeltaCap is the maximum number of DESTROY deltas that can be accumulated + // before dropping events (default: 200000). DESTROY events are aggregated into + // deltas to handle massive bursts without OOM. When this cap is reached, new + // DESTROY events are dropped and missedEvents counter is incremented. + DestroyDeltaCap int + + // DropsWarnThreshold is the threshold for triggering health check actions + // (default: 10000). When missed events exceed this threshold, the health + // monitor will attempt to restart the conntrack listener to recover from + // potential connection issues. + DropsWarnThreshold int64 + + // ReadBufferSize is the socket read buffer size for the conntrack netlink + // connection (default: 67108864 = 64MB). This affects how much data can be + // buffered at the kernel level before being read by the application. + ReadBufferSize int + + // WriteBufferSize is the socket write buffer size for the conntrack netlink + // connection (default: 67108864 = 64MB). This affects how much data can be + // buffered for writes to the kernel. + WriteBufferSize int + + // HealthCheckIntvl is the interval for periodic health monitoring (default: 5m). + // The health monitor checks missed events count and restarts the listener + // if drops exceed DropsWarnThreshold. + HealthCheckIntvl time.Duration + + // GracefulTimeout is the maximum time to wait for graceful shutdown of all + // goroutines during Stop() (default: 30s). This includes waiting for event + // workers to finish, flushing remaining deltas, and closing connections. + GracefulTimeout time.Duration +} + +// DefaultConfig returns default configuration values suitable for most production environments. +// This is used internally by NewZoneMarkAggregator(). +// +// Default values: +// - EventChanSize: 524288 (512KB) +// - EventWorkerCount: 100 +// - DestroyFlushIntvl: 50ms +// - DestroyDeltaCap: 200000 +// - DropsWarnThreshold: 10000 +// - ReadBufferSize: 67108864 (64MB) +// - WriteBufferSize: 67108864 (64MB) +// - HealthCheckIntvl: 5m +// - GracefulTimeout: 30s +func DefaultConfig() *Config { + return &Config{ + EventChanSize: 512 * 1024, + EventWorkerCount: 100, + DestroyFlushIntvl: 50 * time.Millisecond, + DestroyDeltaCap: 200000, + DropsWarnThreshold: 10000, + ReadBufferSize: 64 * 1024 * 1024, + WriteBufferSize: 64 * 1024 * 1024, + HealthCheckIntvl: 5 * time.Minute, + GracefulTimeout: 30 * time.Second, + } +} diff --git a/internal/conntrack/exporter.go b/internal/conntrack/exporter.go new file mode 100644 index 0000000..57dc3ee --- /dev/null +++ b/internal/conntrack/exporter.go @@ -0,0 +1,89 @@ +// Copyright 2017 DigitalOcean. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package conntrack + +import ( + "fmt" + "log" + + "github.com/prometheus/client_golang/prometheus" +) + +const ( + namespace = "openvswitch" +) + +// Collector is a Prometheus collector for conntrack entries by zone and mark. +type Collector struct { + desc *prometheus.Desc + agg MarkZoneAggregator +} + +// Compile-time assertion that *Collector implements prometheus.Collector +var _ prometheus.Collector = (*Collector)(nil) + +// NewCollector creates a new Prometheus collector for conntrack metrics. +// It starts the aggregator and returns a collector that can be registered +// with Prometheus. The aggregator must be stopped separately via Stop(). +func NewCollector() (prometheus.Collector, MarkZoneAggregator, error) { + agg, err := NewZoneMarkAggregator() + if err != nil { + return nil, nil, fmt.Errorf("failed to create zone/mark aggregator: %w", err) + } + + if err := agg.Start(); err != nil { + return nil, nil, fmt.Errorf("failed to start zone/mark aggregator: %w", err) + } + + return &Collector{ + desc: prometheus.NewDesc( + prometheus.BuildFQName(namespace, "conntrack", "entries"), + "Number of conntrack entries by zone and mark", + []string{"zone", "mark"}, + nil, + ), + agg: agg, + }, agg, nil +} + +// Describe implements prometheus.Collector. +func (c *Collector) Describe(ch chan<- *prometheus.Desc) { + ch <- c.desc +} + +// Collect implements prometheus.Collector. +func (c *Collector) Collect(ch chan<- prometheus.Metric) { + if c.agg == nil { + log.Printf("No aggregator available, emitting zero metric") + ch <- prometheus.MustNewConstMetric( + c.desc, + prometheus.GaugeValue, + 0, + "unknown", "unknown", + ) + return + } + + snapshot := c.agg.Snapshot() + for key, count := range snapshot { + ch <- prometheus.MustNewConstMetric( + c.desc, + prometheus.GaugeValue, + float64(count), + fmt.Sprintf("%d", key.Zone), + fmt.Sprintf("%d", key.Mark), + ) + } +} diff --git a/internal/conntrack/exporter_mock_test.go b/internal/conntrack/exporter_mock_test.go new file mode 100644 index 0000000..69c48c3 --- /dev/null +++ b/internal/conntrack/exporter_mock_test.go @@ -0,0 +1,462 @@ +//go:build !linux +// +build !linux + +package conntrack + +import ( + "sync" + "testing" + "time" + + "bytes" + "io" + "net/http" + "net/http/httptest" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promhttp" + "github.com/prometheus/prometheus/util/promlint" +) + +func testCollector(t *testing.T, collector prometheus.Collector) []byte { + t.Helper() + + // Set up and gather metrics from a single pass. + reg := prometheus.NewPedanticRegistry() + if err := reg.Register(collector); err != nil { + t.Fatalf("failed to register Prometheus collector: %v", err) + } + + srv := httptest.NewServer(promhttp.HandlerFor(reg, promhttp.HandlerOpts{})) + defer srv.Close() + + resp, err := http.Get(srv.URL) + if err != nil { + t.Fatalf("failed to GET data from prometheus: %v", err) + } + defer resp.Body.Close() + + buf, err := io.ReadAll(resp.Body) + if err != nil { + t.Fatalf("failed to read server response: %v", err) + } + + // Check for lint cleanliness of metrics. + problems, err := promlint.New(bytes.NewReader(buf)).Lint() + if err != nil { + t.Fatalf("failed to lint metrics: %v", err) + } + + if len(problems) > 0 { + for _, p := range problems { + t.Logf("\t%s: %s", p.Metric, p.Text) + } + + t.Fatal("failing test due to lint problems") + } + + // Metrics check out, return to caller for further tests. + return buf +} + +func TestCollector(t *testing.T) { + tests := []struct { + name string + setup func() (MarkZoneAggregator, error) + operations []func(MarkZoneAggregator) error + validate func(*testing.T, *Collector) + wantErr bool + description string + }{ + { + name: "basic_functionality", + setup: func() (MarkZoneAggregator, error) { + return NewMockZoneMarkAggregator() + }, + operations: []func(MarkZoneAggregator) error{ + func(agg MarkZoneAggregator) error { + // Add test data + mockAgg := agg.(*MockZoneMarkAggregator) + mockAgg.SetCount(0, 100, 1500) + mockAgg.SetCount(0, 200, 2500) + mockAgg.SetCount(1, 300, 3500) + return nil + }, + }, + validate: func(t *testing.T, collector *Collector) { + if collector == nil { + t.Fatal("expected non-nil collector") + } + if collector.desc == nil { + t.Fatal("expected non-nil description") + } + }, + wantErr: false, + description: "Test basic collector functionality with mock data", + }, + { + name: "nil_aggregator", + setup: func() (MarkZoneAggregator, error) { + return nil, nil + }, + operations: []func(MarkZoneAggregator) error{}, + validate: func(t *testing.T, collector *Collector) { + if collector == nil { + t.Fatal("expected non-nil collector") + } + if collector.agg != nil { + t.Error("expected nil aggregator") + } + }, + wantErr: false, + description: "Test collector handles nil aggregator gracefully", + }, + { + name: "empty_aggregator", + setup: func() (MarkZoneAggregator, error) { + return NewMockZoneMarkAggregator() + }, + operations: []func(MarkZoneAggregator) error{}, + validate: func(t *testing.T, collector *Collector) { + if collector == nil { + t.Fatal("expected non-nil collector") + } + snapshot := collector.agg.Snapshot() + if snapshot == nil { + t.Fatal("expected non-nil snapshot") + } + if len(snapshot) != 0 { + t.Errorf("expected empty snapshot, got %d entries", len(snapshot)) + } + }, + wantErr: false, + description: "Test collector with empty aggregator", + }, + { + name: "large_dataset", + setup: func() (MarkZoneAggregator, error) { + return NewMockZoneMarkAggregator() + }, + operations: []func(MarkZoneAggregator) error{ + func(agg MarkZoneAggregator) error { + // Add large dataset - simulate 10K entries across multiple zones + mockAgg := agg.(*MockZoneMarkAggregator) + for zone := uint16(0); zone < 10; zone++ { + for mark := uint32(0); mark < 1000; mark++ { + mockAgg.SetCount(zone, mark, int(uint32(zone)*1000+mark)) + } + } + return nil + }, + }, + validate: func(t *testing.T, collector *Collector) { + snapshot := collector.agg.Snapshot() + if len(snapshot) != 10000 { + t.Errorf("expected 10000 entries, got %d", len(snapshot)) + } + }, + wantErr: false, + description: "Test collector with large dataset", + }, + { + name: "edge_cases", + setup: func() (MarkZoneAggregator, error) { + return NewMockZoneMarkAggregator() + }, + operations: []func(MarkZoneAggregator) error{ + func(agg MarkZoneAggregator) error { + mockAgg := agg.(*MockZoneMarkAggregator) + // Test zero values + mockAgg.SetCount(0, 0, 0) + // Test maximum values + mockAgg.SetCount(65535, 4294967295, 1000000) + // Test negative count (should be handled gracefully) + mockAgg.SetCount(1, 1, -1) + return nil + }, + }, + validate: func(t *testing.T, collector *Collector) { + snapshot := collector.agg.Snapshot() + // Should have 2 entries (zero and negative counts should be filtered out) + if len(snapshot) != 2 { + t.Errorf("expected 2 entries, got %d", len(snapshot)) + } + }, + wantErr: false, + description: "Test collector with edge cases", + }, + { + name: "concurrent_operations", + setup: func() (MarkZoneAggregator, error) { return NewMockZoneMarkAggregator() }, + operations: []func(MarkZoneAggregator) error{ + func(agg MarkZoneAggregator) error { + mockAgg := agg.(*MockZoneMarkAggregator) + mockAgg.SetCount(0, 100, 100) + return nil + }, + }, + validate: func(t *testing.T, collector *Collector) { + var wg sync.WaitGroup + wg.Add(10) + for i := 0; i < 10; i++ { + go func() { + defer wg.Done() + snapshot := collector.agg.Snapshot() + if snapshot == nil { + t.Error("Concurrent snapshot returned nil") + } + }() + } + wg.Wait() + }, + wantErr: false, + description: "Test concurrent collector operations", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + agg, err := tt.setup() + if (err != nil) != tt.wantErr { + t.Errorf("setup error = %v, wantErr %v", err, tt.wantErr) + return + } + + if agg != nil { + t.Cleanup(func() { agg.Stop() }) + } + + for i, op := range tt.operations { + if err := op(agg); err != nil { + t.Errorf("operation %d failed: %v", i, err) + return + } + } + + collector := &Collector{agg: agg} + if tt.validate != nil { + tt.validate(t, collector) + } + + // Test the collector with Prometheus + testCollector(t, collector) + }) + } +} + +func TestMockAggregatorOperations(t *testing.T) { + tests := []struct { + name string + operations []func(*MockZoneMarkAggregator) + validate func(*testing.T, *MockZoneMarkAggregator) + description string + }{ + { + name: "add_remove_entries", + operations: []func(*MockZoneMarkAggregator){ + func(agg *MockZoneMarkAggregator) { + agg.AddEntry(0, 100) + agg.AddEntry(0, 100) + agg.AddEntry(1, 200) + }, + func(agg *MockZoneMarkAggregator) { + agg.RemoveEntry(0, 100) + }, + }, + validate: func(t *testing.T, agg *MockZoneMarkAggregator) { + snapshot := agg.Snapshot() + if len(snapshot) != 2 { + t.Errorf("expected 2 entries, got %d", len(snapshot)) + } + // Check specific counts + key1 := ZoneMarkKey{Zone: 0, Mark: 100} + key2 := ZoneMarkKey{Zone: 1, Mark: 200} + if snapshot[key1] != 1 { + t.Errorf("expected count 1 for key %v, got %d", key1, snapshot[key1]) + } + if snapshot[key2] != 1 { + t.Errorf("expected count 1 for key %v, got %d", key2, snapshot[key2]) + } + }, + description: "Test add/remove entry operations", + }, + { + name: "set_count_operations", + operations: []func(*MockZoneMarkAggregator){ + func(agg *MockZoneMarkAggregator) { + agg.SetCount(0, 100, 1500) + agg.SetCount(1, 200, 2500) + agg.SetCount(2, 300, 0) // Should be filtered out + }, + }, + validate: func(t *testing.T, agg *MockZoneMarkAggregator) { + snapshot := agg.Snapshot() + if len(snapshot) != 2 { + t.Errorf("expected 2 entries, got %d", len(snapshot)) + } + key1 := ZoneMarkKey{Zone: 0, Mark: 100} + key2 := ZoneMarkKey{Zone: 1, Mark: 200} + if snapshot[key1] != 1500 { + t.Errorf("expected count 1500 for key %v, got %d", key1, snapshot[key1]) + } + if snapshot[key2] != 2500 { + t.Errorf("expected count 2500 for key %v, got %d", key2, snapshot[key2]) + } + }, + description: "Test set count operations", + }, + { + name: "clear_operations", + operations: []func(*MockZoneMarkAggregator){ + func(agg *MockZoneMarkAggregator) { + agg.SetCount(0, 100, 1500) + agg.SetCount(1, 200, 2500) + }, + func(agg *MockZoneMarkAggregator) { + agg.Clear() + }, + }, + validate: func(t *testing.T, agg *MockZoneMarkAggregator) { + snapshot := agg.Snapshot() + if len(snapshot) != 0 { + t.Errorf("expected empty snapshot after clear, got %d entries", len(snapshot)) + } + }, + description: "Test clear operations", + }, + { + name: "health_metrics", + operations: []func(*MockZoneMarkAggregator){ + func(agg *MockZoneMarkAggregator) { + agg.SetCount(0, 100, 1000) + }, + }, + validate: func(t *testing.T, agg *MockZoneMarkAggregator) { + if !agg.IsHealthy() { + t.Error("expected healthy aggregator") + } + if agg.GetEventRate() != 100.0 { + t.Errorf("expected event rate 100.0, got %f", agg.GetEventRate()) + } + if agg.GetMissedEvents() != 0 { + t.Errorf("expected 0 missed events, got %d", agg.GetMissedEvents()) + } + lastEventTime := agg.GetLastEventTime() + if lastEventTime.IsZero() { + t.Error("expected non-zero last event time") + } + }, + description: "Test health metrics", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + agg, err := NewMockZoneMarkAggregator() + if err != nil { + t.Fatalf("Failed to create mock aggregator: %v", err) + } + t.Cleanup(func() { agg.Stop() }) + + for _, op := range tt.operations { + op(agg) + // Add small delay between operations to test timing + time.Sleep(1 * time.Millisecond) + } + + if tt.validate != nil { + tt.validate(t, agg) + } + }) + } +} + +func TestCollectorIntegration(t *testing.T) { + tests := []struct { + name string + setup func() (*MockZoneMarkAggregator, error) + operations []func(*MockZoneMarkAggregator) + validate func(*testing.T, *Collector) + description string + }{ + { + name: "full_lifecycle", + setup: func() (*MockZoneMarkAggregator, error) { + return NewMockZoneMarkAggregator() + }, + operations: []func(*MockZoneMarkAggregator){ + func(agg *MockZoneMarkAggregator) { + // Start the aggregator + agg.Start() + }, + func(agg *MockZoneMarkAggregator) { + // Add some data + agg.SetCount(0, 100, 1500) + agg.SetCount(1, 200, 2500) + }, + func(agg *MockZoneMarkAggregator) { + // Modify data + agg.AddEntry(0, 100) + agg.RemoveEntry(1, 200) + }, + }, + validate: func(t *testing.T, collector *Collector) { + // Test that collector can handle the aggregator + snapshot := collector.agg.Snapshot() + if snapshot == nil { + t.Fatal("expected non-nil snapshot") + } + // Should have 2 entries (one added, one removed) + if len(snapshot) != 2 { + t.Errorf("expected 2 entries, got %d", len(snapshot)) + } + }, + description: "Test full lifecycle with collector", + }, + { + name: "stress_test", + setup: func() (*MockZoneMarkAggregator, error) { + return NewMockZoneMarkAggregator() + }, + operations: []func(*MockZoneMarkAggregator){ + func(agg *MockZoneMarkAggregator) { + // Add many entries rapidly + for i := 0; i < 1000; i++ { + agg.SetCount(uint16(i%10), uint32(i), i) + } + }, + }, + validate: func(t *testing.T, collector *Collector) { + snapshot := collector.agg.Snapshot() + if len(snapshot) != 1000 { + t.Errorf("expected 1000 entries, got %d", len(snapshot)) + } + }, + description: "Test stress scenario with many entries", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + agg, err := tt.setup() + if err != nil { + t.Fatalf("Failed to create mock aggregator: %v", err) + } + t.Cleanup(func() { agg.Stop() }) + + for _, op := range tt.operations { + op(agg) + // Small delay between operations + time.Sleep(1 * time.Millisecond) + } + + collector := &Collector{agg: agg} + if tt.validate != nil { + tt.validate(t, collector) + } + + // Test with Prometheus + testCollector(t, collector) + }) + } +} diff --git a/internal/conntrack/exporter_test.go b/internal/conntrack/exporter_test.go new file mode 100644 index 0000000..8e3bdf9 --- /dev/null +++ b/internal/conntrack/exporter_test.go @@ -0,0 +1,406 @@ +//go:build linux +// +build linux + +// Copyright 2018-2021 DigitalOcean. +// SPDX-License-Identifier: Apache-2.0 + +package conntrack + +import ( + "sync" + "testing" + "time" + + "bytes" + "io" + "net/http" + "net/http/httptest" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promhttp" + "github.com/prometheus/prometheus/util/promlint" +) + +func testCollector(t *testing.T, collector prometheus.Collector) []byte { + t.Helper() + + // Set up and gather metrics from a single pass. + reg := prometheus.NewPedanticRegistry() + if err := reg.Register(collector); err != nil { + t.Fatalf("failed to register Prometheus collector: %v", err) + } + + srv := httptest.NewServer(promhttp.HandlerFor(reg, promhttp.HandlerOpts{})) + defer srv.Close() + + resp, err := http.Get(srv.URL) + if err != nil { + t.Fatalf("failed to GET data from prometheus: %v", err) + } + defer resp.Body.Close() + + buf, err := io.ReadAll(resp.Body) + if err != nil { + t.Fatalf("failed to read server response: %v", err) + } + + // Check for lint cleanliness of metrics. + problems, err := promlint.New(bytes.NewReader(buf)).Lint() + if err != nil { + t.Fatalf("failed to lint metrics: %v", err) + } + + if len(problems) > 0 { + for _, p := range problems { + t.Logf("\t%s: %s", p.Metric, p.Text) + } + + t.Fatal("failing test due to lint problems") + } + + // Metrics check out, return to caller for further tests. + return buf +} + +func TestCollector(t *testing.T) { + tests := []struct { + name string + setup func() (MarkZoneAggregator, error) + operations []func(MarkZoneAggregator) error + validate func(*testing.T, *Collector) + wantErr bool + skipOnError bool + description string + }{ + { + name: "real_aggregator_creation", + setup: func() (MarkZoneAggregator, error) { + return NewZoneMarkAggregator() + }, + operations: []func(MarkZoneAggregator) error{ + func(agg MarkZoneAggregator) error { return agg.Start() }, + }, + validate: func(t *testing.T, collector *Collector) { + if collector == nil { + t.Fatal("expected non-nil collector") + } + if collector.desc == nil { + t.Fatal("expected non-nil description") + } + if collector.agg == nil { + t.Fatal("expected non-nil aggregator") + } + }, + wantErr: false, + skipOnError: true, // Skip if permission issues + description: "Test collector with real aggregator creation", + }, + { + name: "nil_aggregator_handling", + setup: func() (MarkZoneAggregator, error) { + return nil, nil + }, + operations: []func(MarkZoneAggregator) error{}, + validate: func(t *testing.T, collector *Collector) { + // Intentionally minimal: do not attempt Prometheus registration when agg is nil. + if collector.agg != nil { + t.Errorf("expected nil aggregator, got non-nil") + } + }, + wantErr: false, + skipOnError: false, + description: "Test collector handles nil aggregator gracefully", + }, + { + name: "real_data_processing", + setup: func() (MarkZoneAggregator, error) { + return NewZoneMarkAggregator() + }, + operations: []func(MarkZoneAggregator) error{ + func(agg MarkZoneAggregator) error { return agg.Start() }, + func(agg MarkZoneAggregator) error { + // Let it run briefly to potentially collect real data + time.Sleep(50 * time.Millisecond) + return nil + }, + }, + validate: func(t *testing.T, collector *Collector) { + snapshot := collector.agg.Snapshot() + if snapshot == nil { + t.Fatal("expected non-nil snapshot") + } + // In test environment, snapshot might be empty + t.Logf("Snapshot contains %d entries", len(snapshot)) + }, + wantErr: false, + skipOnError: true, + description: "Test collector with real data processing", + }, + { + name: "concurrent_collection", + setup: func() (MarkZoneAggregator, error) { return NewZoneMarkAggregator() }, + operations: []func(MarkZoneAggregator) error{func(agg MarkZoneAggregator) error { return agg.Start() }}, + validate: func(t *testing.T, collector *Collector) { + var wg sync.WaitGroup + wg.Add(10) + for i := 0; i < 10; i++ { + go func() { + defer wg.Done() + snapshot := collector.agg.Snapshot() + if snapshot == nil { + t.Error("Concurrent snapshot returned nil") + } + }() + } + wg.Wait() + }, + wantErr: false, + skipOnError: true, + description: "Test concurrent collection operations", + }, + { + name: "lifecycle_management", + setup: func() (MarkZoneAggregator, error) { + return NewZoneMarkAggregator() + }, + operations: []func(MarkZoneAggregator) error{ + func(agg MarkZoneAggregator) error { return agg.Start() }, + func(agg MarkZoneAggregator) error { + // Let it run briefly + time.Sleep(10 * time.Millisecond) + return nil + }, + func(agg MarkZoneAggregator) error { + // Stop the aggregator + agg.Stop() + return nil + }, + }, + validate: func(t *testing.T, collector *Collector) { + // Snapshot should still work after stop + snapshot := collector.agg.Snapshot() + if snapshot == nil { + t.Error("Snapshot should work after stop") + } + }, + wantErr: false, + skipOnError: true, + description: "Test aggregator lifecycle management", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + agg, err := tt.setup() + if (err != nil) != tt.wantErr { + if tt.skipOnError { + t.Logf("Skipping test due to expected failure: %v", err) + return + } + t.Errorf("setup error = %v, wantErr %v", err, tt.wantErr) + return + } + + if agg != nil { + t.Cleanup(func() { agg.Stop() }) + } + + for i, op := range tt.operations { + if err := op(agg); err != nil { + if tt.skipOnError { + t.Logf("Skipping test due to operation %d failure: %v", i, err) + return + } + t.Errorf("operation %d failed: %v", i, err) + return + } + } + + collector := &Collector{agg: agg} + if tt.validate != nil { + tt.validate(t, collector) + } + + // Only run Prometheus registration when we have a non-nil aggregator. + if agg != nil { + testCollector(t, collector) + } else { + t.Log("Skipping Prometheus registration for nil aggregator case") + } + }) + } +} + +func TestCollectorWithRealData(t *testing.T) { + if testing.Short() { + t.Skip("Skipping conntrack test in short mode") + } + + tests := []struct { + name string + duration time.Duration + validate func(*testing.T, *Collector) + description string + }{ + { + name: "short_duration", + duration: 100 * time.Millisecond, + validate: func(t *testing.T, collector *Collector) { + snapshot := collector.agg.Snapshot() + if snapshot == nil { + t.Fatal("expected non-nil snapshot") + } + t.Logf("Short duration test: %d entries collected", len(snapshot)) + }, + description: "Test with short data collection duration", + }, + { + name: "medium_duration", + duration: 500 * time.Millisecond, + validate: func(t *testing.T, collector *Collector) { + snapshot := collector.agg.Snapshot() + if snapshot == nil { + t.Fatal("expected non-nil snapshot") + } + t.Logf("Medium duration test: %d entries collected", len(snapshot)) + }, + description: "Test with medium data collection duration", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // Test with real conntrack data if available + agg, err := NewZoneMarkAggregator() + if err != nil { + t.Skipf("Skipping real data test: %v", err) + } + + t.Cleanup(func() { agg.Stop() }) + + // Start the aggregator + if err := agg.Start(); err != nil { + t.Skipf("Skipping real data test - failed to start: %v", err) + } + + // Wait for data to accumulate + time.Sleep(tt.duration) + + collector := &Collector{agg: agg} + if tt.validate != nil { + tt.validate(t, collector) + } + + // Test the collector with Prometheus + testCollector(t, collector) + }) + } +} + +func TestCollectorEdgeCases(t *testing.T) { + tests := []struct { + name string + setup func() (MarkZoneAggregator, error) + operations []func(MarkZoneAggregator) error + validate func(*testing.T, *Collector) + wantErr bool + skipOnError bool + description string + }{ + { + name: "start_stop_multiple_times", + setup: func() (MarkZoneAggregator, error) { + return NewZoneMarkAggregator() + }, + operations: []func(MarkZoneAggregator) error{ + func(agg MarkZoneAggregator) error { return agg.Start() }, + func(agg MarkZoneAggregator) error { + time.Sleep(10 * time.Millisecond) + agg.Stop() + return nil + }, + func(agg MarkZoneAggregator) error { + // Try to start again after stop + return agg.Start() + }, + }, + validate: func(t *testing.T, collector *Collector) { + // Should handle restart gracefully + snapshot := collector.agg.Snapshot() + if snapshot == nil { + t.Error("Snapshot should work after restart") + } + }, + wantErr: false, + skipOnError: true, + description: "Test start/stop multiple times", + }, + { + name: "rapid_start_stop_cycles", + setup: func() (MarkZoneAggregator, error) { + return NewZoneMarkAggregator() + }, + operations: []func(MarkZoneAggregator) error{ + func(agg MarkZoneAggregator) error { + // Rapid start/stop cycles + for i := 0; i < 5; i++ { + if err := agg.Start(); err != nil { + return err + } + time.Sleep(1 * time.Millisecond) + agg.Stop() + time.Sleep(1 * time.Millisecond) + } + return nil + }, + }, + validate: func(t *testing.T, collector *Collector) { + // Should not panic or leak resources + snapshot := collector.agg.Snapshot() + if snapshot == nil { + t.Error("Snapshot should work after rapid cycles") + } + }, + wantErr: false, + skipOnError: true, + description: "Test rapid start/stop cycles", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + agg, err := tt.setup() + if (err != nil) != tt.wantErr { + if tt.skipOnError { + t.Logf("Skipping test due to expected failure: %v", err) + return + } + t.Errorf("setup error = %v, wantErr %v", err, tt.wantErr) + return + } + + if agg != nil { + t.Cleanup(func() { agg.Stop() }) + } + + for i, op := range tt.operations { + if err := op(agg); err != nil { + if tt.skipOnError { + t.Logf("Skipping test due to operation %d failure: %v", i, err) + return + } + t.Errorf("operation %d failed: %v", i, err) + return + } + } + + collector := &Collector{agg: agg} + if tt.validate != nil { + tt.validate(t, collector) + } + + // Test the collector with Prometheus + testCollector(t, collector) + }) + } +} diff --git a/internal/conntrack/mock.go b/internal/conntrack/mock.go new file mode 100644 index 0000000..a0d1fa6 --- /dev/null +++ b/internal/conntrack/mock.go @@ -0,0 +1,138 @@ +//go:build !linux +// +build !linux + +package conntrack + +import ( + "context" + "sync" + "time" +) + +// Compile-time assertion that *MockZoneMarkAggregator implements MarkZoneAggregator +var _ MarkZoneAggregator = (*MockZoneMarkAggregator)(nil) + +// MockZoneMarkAggregator provides a mock implementation for non-Linux platforms +type MockZoneMarkAggregator struct { + *ZoneMarkAggregator + counts map[ZoneMarkKey]int + countsMu sync.RWMutex +} + +// NewMockZoneMarkAggregator creates a mock aggregator for testing +func NewMockZoneMarkAggregator() (*MockZoneMarkAggregator, error) { + return NewZoneMarkAggregatorWithConfig(DefaultConfig()) +} + +// NewZoneMarkAggregator provides a linux-compatible constructor name on non-Linux +// platforms. This allows calling code to use conntrack.NewZoneMarkAggregator() +// uniformly across OSes while receiving a mock implementation on non-Linux. +func NewZoneMarkAggregator() (MarkZoneAggregator, error) { //nolint:golint // cross-platform parity, returns interface + return NewZoneMarkAggregatorWithConfig(DefaultConfig()) +} + +// NewZoneMarkAggregatorWithConfig creates a mock aggregator with custom configuration +func NewZoneMarkAggregatorWithConfig(config *Config) (*MockZoneMarkAggregator, error) { + ctx, cancel := context.WithCancel(context.Background()) + return &MockZoneMarkAggregator{ + ZoneMarkAggregator: &ZoneMarkAggregator{ + config: config, + ctx: ctx, + cancel: cancel, + }, + counts: make(map[ZoneMarkKey]int), + }, nil +} + +// Snapshot returns a copy of the current counts +func (m *MockZoneMarkAggregator) Snapshot() map[ZoneMarkKey]int { + m.countsMu.RLock() + defer m.countsMu.RUnlock() + + snapshot := make(map[ZoneMarkKey]int) + for k, v := range m.counts { + snapshot[k] = v + } + return snapshot +} + +// Start starts the mock aggregator (no-op for mock) +func (m *MockZoneMarkAggregator) Start() error { + return nil +} + +// Stop stops the mock aggregator with graceful shutdown +func (m *MockZoneMarkAggregator) Stop() error { + m.cancel() + // Mock implementation doesn't need actual cleanup + return nil +} + +// AddEntry adds a mock entry for testing +func (m *MockZoneMarkAggregator) AddEntry(zone uint16, mark uint32) { + m.countsMu.Lock() + defer m.countsMu.Unlock() + + key := ZoneMarkKey{Zone: zone, Mark: mark} + m.counts[key]++ +} + +// RemoveEntry removes a mock entry for testing +func (m *MockZoneMarkAggregator) RemoveEntry(zone uint16, mark uint32) { + m.countsMu.Lock() + defer m.countsMu.Unlock() + + key := ZoneMarkKey{Zone: zone, Mark: mark} + if m.counts[key] > 0 { + m.counts[key]-- + if m.counts[key] == 0 { + delete(m.counts, key) + } + } +} + +// SetCount sets a specific count for testing +func (m *MockZoneMarkAggregator) SetCount(zone uint16, mark uint32, count int) { + m.countsMu.Lock() + defer m.countsMu.Unlock() + + key := ZoneMarkKey{Zone: zone, Mark: mark} + if count <= 0 { + delete(m.counts, key) + } else { + m.counts[key] = count + } +} + +// Clear clears all counts +func (m *MockZoneMarkAggregator) Clear() { + m.countsMu.Lock() + defer m.countsMu.Unlock() + + m.counts = make(map[ZoneMarkKey]int) +} + +// GetEventRate returns a mock event rate +func (m *MockZoneMarkAggregator) GetEventRate() float64 { + return 100.0 // Mock rate +} + +// GetEventCount returns a mock event count +func (m *MockZoneMarkAggregator) GetEventCount() int64 { + return int64(len(m.counts)) * 10 // Mock count +} + +// GetMissedEvents returns a mock missed events count +func (m *MockZoneMarkAggregator) GetMissedEvents() int64 { + return 0 // Mock no missed events +} + +// IsHealthy returns true for mock +func (m *MockZoneMarkAggregator) IsHealthy() bool { + return true +} + +// GetLastEventTime returns current time for mock +func (m *MockZoneMarkAggregator) GetLastEventTime() time.Time { + return time.Now() +} diff --git a/internal/ovsexporter/ovsexporter_test.go b/internal/ovsexporter/ovsexporter_test.go index 39e08be..c140af3 100644 --- a/internal/ovsexporter/ovsexporter_test.go +++ b/internal/ovsexporter/ovsexporter_test.go @@ -5,7 +5,7 @@ package ovsexporter import ( "bytes" - "io/ioutil" + "io" "net/http" "net/http/httptest" "testing" @@ -33,7 +33,7 @@ func testCollector(t *testing.T, collector prometheus.Collector) []byte { } defer resp.Body.Close() - buf, err := ioutil.ReadAll(resp.Body) + buf, err := io.ReadAll(resp.Body) if err != nil { t.Fatalf("failed to read server response: %v", err) }