From dedeefa70b7fd228fc1ed49c443151f9fd2bbabd Mon Sep 17 00:00:00 2001 From: Imran Pochi Date: Mon, 27 Oct 2025 19:02:05 +0000 Subject: [PATCH 1/4] implements graceful termination for server Signed-off-by: Imran Pochi --- cmd/server/app/options/options.go | 5 ++ cmd/server/app/server.go | 114 +++++++++++++++++++++++++----- 2 files changed, 101 insertions(+), 18 deletions(-) diff --git a/cmd/server/app/options/options.go b/cmd/server/app/options/options.go index b213522e5..525e07a28 100644 --- a/cmd/server/app/options/options.go +++ b/cmd/server/app/options/options.go @@ -115,6 +115,8 @@ type ProxyRunOptions struct { LeaseLabel string // Needs kubernetes client NeedsKubernetesClient bool + // Graceful shutdown timeout duration + GracefulShutdownTimeout time.Duration } func (o *ProxyRunOptions) Flags() *pflag.FlagSet { @@ -155,6 +157,7 @@ func (o *ProxyRunOptions) Flags() *pflag.FlagSet { flags.BoolVar(&o.EnableLeaseController, "enable-lease-controller", o.EnableLeaseController, "Enable lease controller to publish and garbage collect proxy server leases.") flags.StringVar(&o.LeaseNamespace, "lease-namespace", o.LeaseNamespace, "The namespace where lease objects are managed by the controller.") flags.StringVar(&o.LeaseLabel, "lease-label", o.LeaseLabel, "The labels on which the lease objects are managed.") + flags.DurationVar(&o.GracefulShutdownTimeout, "graceful-shutdown-timeout", o.GracefulShutdownTimeout, "Timeout duration for graceful shutdown of the server. The server will wait for active connections to close before forcefully terminating.") flags.Bool("warn-on-channel-limit", true, "This behavior is now thread safe and always on. This flag will be removed in a future release.") flags.MarkDeprecated("warn-on-channel-limit", "This behavior is now thread safe and always on. This flag will be removed in a future release.") @@ -198,6 +201,7 @@ func (o *ProxyRunOptions) Print() { klog.V(1).Infof("LeaseLabel set to %s.\n", o.LeaseLabel) klog.V(1).Infof("CipherSuites set to %q.\n", o.CipherSuites) klog.V(1).Infof("XfrChannelSize set to %d.\n", o.XfrChannelSize) + klog.V(1).Infof("GracefulShutdownTimeout set to %v.\n", o.GracefulShutdownTimeout) } func (o *ProxyRunOptions) Validate() error { @@ -382,6 +386,7 @@ func NewProxyRunOptions() *ProxyRunOptions { EnableLeaseController: false, LeaseNamespace: "kube-system", LeaseLabel: "k8s-app=konnectivity-server", + GracefulShutdownTimeout: 15 * time.Second, } return &o } diff --git a/cmd/server/app/server.go b/cmd/server/app/server.go index 0e6e8f16c..61426c1ff 100644 --- a/cmd/server/app/server.go +++ b/cmd/server/app/server.go @@ -95,7 +95,7 @@ type Proxy struct { server *server.ProxyServer } -type StopFunc func() +type StopFunc func(context.Context) error func (p *Proxy) Run(o *options.ProxyRunOptions, stopCh <-chan struct{}) error { o.Print() @@ -145,16 +145,12 @@ func (p *Proxy) Run(o *options.ProxyRunOptions, stopCh <-chan struct{}) error { if err != nil { return fmt.Errorf("failed to run the frontend server: %v", err) } - if frontendStop != nil { - defer frontendStop() - } klog.V(1).Infoln("Starting agent server for tunnel connections.") err = p.runAgentServer(o, p.server) if err != nil { return fmt.Errorf("failed to run the agent server: %v", err) } - defer p.agentServer.Stop() labels, err := util.ParseLabels(o.LeaseLabel) if err != nil { @@ -182,17 +178,97 @@ func (p *Proxy) Run(o *options.ProxyRunOptions, stopCh <-chan struct{}) error { if err != nil { return fmt.Errorf("failed to run the admin server: %v", err) } - defer p.adminServer.Close() klog.V(1).Infoln("Starting health server for healthchecks.") err = p.runHealthServer(o, p.server) if err != nil { return fmt.Errorf("failed to run the health server: %v", err) } - defer p.healthServer.Close() <-stopCh - klog.V(1).Infoln("Shutting down server.") + klog.V(1).Infoln("Received shutdown signal, initiating graceful shutdown.") + + // Start graceful shutdown with timeout + shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), o.GracefulShutdownTimeout) + defer shutdownCancel() + + // Create a WaitGroup to track shutdown of all components + var wg sync.WaitGroup + + // Shutdown frontend server gracefully (if available) + if frontendStop != nil { + wg.Add(1) + go func() { + defer wg.Done() + klog.V(1).Infoln("Gracefully stopping frontend server...") + if err := frontendStop(shutdownCtx); err != nil { + klog.ErrorS(err, "failed to shut down frontend server") + } else { + klog.V(1).Infoln("frontend server stopped.") + } + }() + } + + // Shutdown agent server gracefully + wg.Add(1) + go func() { + defer wg.Done() + klog.V(1).Infoln("Gracefully stopping agent server...") + p.agentServer.GracefulStop() + klog.V(1).Infoln("agent server stopped.") + }() + + // Shutdown admin server gracefully + wg.Add(1) + go func() { + defer wg.Done() + klog.V(1).Infoln("Gracefully stopping admin server...") + if err := p.adminServer.Shutdown(shutdownCtx); err != nil { + klog.ErrorS(err, "failed to shut down admin server") + + } else { + klog.V(1).Infoln("admin server stopped.") + } + }() + + // Shutdown health server gracefully + wg.Add(1) + go func() { + defer wg.Done() + klog.V(1).Infoln("Gracefully stopping health server...") + if err := p.healthServer.Shutdown(shutdownCtx); err != nil { + klog.ErrorS(err, "failed to shut down health server") + } else { + klog.V(1).Infoln("health server stopped.") + } + }() + + // Wait for all servers to shutdown or timeout + shutdownComplete := make(chan struct{}) + go func() { + wg.Wait() + close(shutdownComplete) + }() + + select { + case <-shutdownComplete: + klog.V(1).Infoln("Graceful shutdown completed successfully.") + case <-shutdownCtx.Done(): + klog.Warningf("Graceful shutdown timed out after %v, forcing termination.", o.GracefulShutdownTimeout) + // Force stop all servers that might still be running + if p.agentServer != nil { + p.agentServer.Stop() + } + if p.adminServer != nil { + p.adminServer.Close() + } + if p.healthServer != nil { + p.healthServer.Close() + } + // frontend server's force-stop is handled by its StopFunc + } + + klog.V(1).Infoln("Server shutdown complete.") return nil } @@ -260,7 +336,10 @@ func (p *Proxy) runUDSFrontendServer(ctx context.Context, o *options.ProxyRunOpt "udsFile", o.UdsName, ) go runpprof.Do(context.Background(), labels, func(context.Context) { grpcServer.Serve(lis) }) - stop = grpcServer.GracefulStop + stop = func(ctx context.Context) error { + grpcServer.GracefulStop() + return nil + } } else { // http-connect server := &http.Server{ @@ -269,9 +348,8 @@ func (p *Proxy) runUDSFrontendServer(ctx context.Context, o *options.ProxyRunOpt Server: s, }, } - stop = func() { - err := server.Shutdown(ctx) - klog.ErrorS(err, "error shutting down server") + stop = func(shutdownCtx context.Context) error { + return server.Shutdown(shutdownCtx) } labels := runpprof.Labels( "core", "udsHttpFrontend", @@ -356,7 +434,10 @@ func (p *Proxy) runMTLSFrontendServer(ctx context.Context, o *options.ProxyRunOp "port", strconv.Itoa(o.ServerPort), ) go runpprof.Do(context.Background(), labels, func(context.Context) { grpcServer.Serve(lis) }) - stop = grpcServer.GracefulStop + stop = func(ctx context.Context) error { + grpcServer.GracefulStop() + return nil + } } else { // http-connect server := &http.Server{ @@ -368,11 +449,8 @@ func (p *Proxy) runMTLSFrontendServer(ctx context.Context, o *options.ProxyRunOp }, TLSNextProto: make(map[string]func(*http.Server, *tls.Conn, http.Handler)), } - stop = func() { - err := server.Shutdown(ctx) - if err != nil { - klog.ErrorS(err, "failed to shutdown server") - } + stop = func(shutdownCtx context.Context) error { + return server.Shutdown(shutdownCtx) } labels := runpprof.Labels( "core", "mtlsHttpFrontend", From 2dfa6b73993876ada8031e9bd3234f3932e77c0a Mon Sep 17 00:00:00 2001 From: Imran Pochi Date: Wed, 29 Oct 2025 21:11:16 +0000 Subject: [PATCH 2/4] fix golangci-lint failures Signed-off-by: Imran Pochi --- cmd/server/app/server.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/cmd/server/app/server.go b/cmd/server/app/server.go index 61426c1ff..d09baf167 100644 --- a/cmd/server/app/server.go +++ b/cmd/server/app/server.go @@ -336,7 +336,7 @@ func (p *Proxy) runUDSFrontendServer(ctx context.Context, o *options.ProxyRunOpt "udsFile", o.UdsName, ) go runpprof.Do(context.Background(), labels, func(context.Context) { grpcServer.Serve(lis) }) - stop = func(ctx context.Context) error { + stop = func(_ context.Context) error { grpcServer.GracefulStop() return nil } @@ -407,7 +407,7 @@ func (p *Proxy) getTLSConfig(caFile, certFile, keyFile string, cipherSuites []st return tlsConfig, nil } -func (p *Proxy) runMTLSFrontendServer(ctx context.Context, o *options.ProxyRunOptions, s *server.ProxyServer) (StopFunc, error) { +func (p *Proxy) runMTLSFrontendServer(_ context.Context, o *options.ProxyRunOptions, s *server.ProxyServer) (StopFunc, error) { var stop StopFunc var tlsConfig *tls.Config @@ -434,7 +434,7 @@ func (p *Proxy) runMTLSFrontendServer(ctx context.Context, o *options.ProxyRunOp "port", strconv.Itoa(o.ServerPort), ) go runpprof.Do(context.Background(), labels, func(context.Context) { grpcServer.Serve(lis) }) - stop = func(ctx context.Context) error { + stop = func(_ context.Context) error { grpcServer.GracefulStop() return nil } From cc87190ed6b678186c1119cb62196a0d4000f249 Mon Sep 17 00:00:00 2001 From: Imran Pochi Date: Tue, 11 Nov 2025 21:15:55 +0000 Subject: [PATCH 3/4] addressing review Signed-off-by: Imran Pochi --- cmd/server/app/options/options.go | 9 +++-- cmd/server/app/options/options_test.go | 20 +++++++++++ cmd/server/app/server.go | 49 ++++++++++++++++++-------- 3 files changed, 62 insertions(+), 16 deletions(-) diff --git a/cmd/server/app/options/options.go b/cmd/server/app/options/options.go index 525e07a28..b112eaffb 100644 --- a/cmd/server/app/options/options.go +++ b/cmd/server/app/options/options.go @@ -157,7 +157,7 @@ func (o *ProxyRunOptions) Flags() *pflag.FlagSet { flags.BoolVar(&o.EnableLeaseController, "enable-lease-controller", o.EnableLeaseController, "Enable lease controller to publish and garbage collect proxy server leases.") flags.StringVar(&o.LeaseNamespace, "lease-namespace", o.LeaseNamespace, "The namespace where lease objects are managed by the controller.") flags.StringVar(&o.LeaseLabel, "lease-label", o.LeaseLabel, "The labels on which the lease objects are managed.") - flags.DurationVar(&o.GracefulShutdownTimeout, "graceful-shutdown-timeout", o.GracefulShutdownTimeout, "Timeout duration for graceful shutdown of the server. The server will wait for active connections to close before forcefully terminating.") + flags.DurationVar(&o.GracefulShutdownTimeout, "graceful-shutdown-timeout", o.GracefulShutdownTimeout, "Timeout duration for graceful shutdown of the server. The server will wait for active connections to close before forcefully terminating. Set to 0 to disable graceful shutdown (default: 0).") flags.Bool("warn-on-channel-limit", true, "This behavior is now thread safe and always on. This flag will be removed in a future release.") flags.MarkDeprecated("warn-on-channel-limit", "This behavior is now thread safe and always on. This flag will be removed in a future release.") @@ -343,6 +343,11 @@ func (o *ProxyRunOptions) Validate() error { } } + // Validate graceful shutdown timeout + if o.GracefulShutdownTimeout < 0 { + return fmt.Errorf("graceful-shutdown-timeout must be >= 0, got %v", o.GracefulShutdownTimeout) + } + o.NeedsKubernetesClient = usingServiceAccountAuth || o.EnableLeaseController return nil @@ -386,7 +391,7 @@ func NewProxyRunOptions() *ProxyRunOptions { EnableLeaseController: false, LeaseNamespace: "kube-system", LeaseLabel: "k8s-app=konnectivity-server", - GracefulShutdownTimeout: 15 * time.Second, + GracefulShutdownTimeout: 0, } return &o } diff --git a/cmd/server/app/options/options_test.go b/cmd/server/app/options/options_test.go index 10b9146b5..b66923c2f 100644 --- a/cmd/server/app/options/options_test.go +++ b/cmd/server/app/options/options_test.go @@ -63,6 +63,7 @@ func TestDefaultServerOptions(t *testing.T) { assertDefaultValue(t, "CipherSuites", defaultServerOptions.CipherSuites, make([]string, 0)) assertDefaultValue(t, "XfrChannelSize", defaultServerOptions.XfrChannelSize, 10) assertDefaultValue(t, "APIContentType", defaultServerOptions.APIContentType, "application/vnd.kubernetes.protobuf") + assertDefaultValue(t, "GracefulShutdownTimeout", defaultServerOptions.GracefulShutdownTimeout, 0*time.Second) } @@ -168,6 +169,21 @@ func TestValidate(t *testing.T) { value: -10, expected: fmt.Errorf("channel size -10 must be greater than 0"), }, + "NegativeGracefulShutdownTimeout": { + field: "GracefulShutdownTimeout", + value: -1 * time.Second, + expected: fmt.Errorf("graceful-shutdown-timeout must be >= 0, got -1s"), + }, + "ZeroGracefulShutdownTimeout": { + field: "GracefulShutdownTimeout", + value: 0 * time.Second, + expected: nil, + }, + "PositiveGracefulShutdownTimeout": { + field: "GracefulShutdownTimeout", + value: 30 * time.Second, + expected: nil, + }, } { t.Run(desc, func(t *testing.T) { testServerOptions := NewProxyRunOptions() @@ -184,6 +200,10 @@ func TestValidate(t *testing.T) { case reflect.Int: ivalue := tc.value.(int) fv.SetInt(int64(ivalue)) + case reflect.Int64: + if duration, ok := tc.value.(time.Duration); ok { + fv.SetInt(int64(duration)) + } } } actual := testServerOptions.Validate() diff --git a/cmd/server/app/server.go b/cmd/server/app/server.go index d09baf167..bffe9f70e 100644 --- a/cmd/server/app/server.go +++ b/cmd/server/app/server.go @@ -186,7 +186,28 @@ func (p *Proxy) Run(o *options.ProxyRunOptions, stopCh <-chan struct{}) error { } <-stopCh - klog.V(1).Infoln("Received shutdown signal, initiating graceful shutdown.") + klog.V(1).Infoln("Shutting down server.") + + // If graceful shutdown timeout is 0, use the old behavior (immediate shutdown) + if o.GracefulShutdownTimeout == 0 { + if frontendStop != nil { + if err := frontendStop(context.Background()); err != nil { + klog.ErrorS(err, "failed to stop frontend server") + } + } + if p.agentServer != nil { + p.agentServer.Stop() + } + if p.adminServer != nil { + p.adminServer.Close() + } + if p.healthServer != nil { + p.healthServer.Close() + } + return nil + } + + klog.V(1).Infoln("Initiating graceful shutdown.") // Start graceful shutdown with timeout shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), o.GracefulShutdownTimeout) @@ -195,9 +216,21 @@ func (p *Proxy) Run(o *options.ProxyRunOptions, stopCh <-chan struct{}) error { // Create a WaitGroup to track shutdown of all components var wg sync.WaitGroup - // Shutdown frontend server gracefully (if available) + // Add all workers to WaitGroup upfront if frontendStop != nil { wg.Add(1) + } + wg.Add(3) // agent, admin, health servers + + // Create completion channel before starting goroutines + shutdownComplete := make(chan struct{}) + go func() { + wg.Wait() + close(shutdownComplete) + }() + + // Shutdown frontend server gracefully (if available) + if frontendStop != nil { go func() { defer wg.Done() klog.V(1).Infoln("Gracefully stopping frontend server...") @@ -210,7 +243,6 @@ func (p *Proxy) Run(o *options.ProxyRunOptions, stopCh <-chan struct{}) error { } // Shutdown agent server gracefully - wg.Add(1) go func() { defer wg.Done() klog.V(1).Infoln("Gracefully stopping agent server...") @@ -219,20 +251,17 @@ func (p *Proxy) Run(o *options.ProxyRunOptions, stopCh <-chan struct{}) error { }() // Shutdown admin server gracefully - wg.Add(1) go func() { defer wg.Done() klog.V(1).Infoln("Gracefully stopping admin server...") if err := p.adminServer.Shutdown(shutdownCtx); err != nil { klog.ErrorS(err, "failed to shut down admin server") - } else { klog.V(1).Infoln("admin server stopped.") } }() // Shutdown health server gracefully - wg.Add(1) go func() { defer wg.Done() klog.V(1).Infoln("Gracefully stopping health server...") @@ -244,12 +273,6 @@ func (p *Proxy) Run(o *options.ProxyRunOptions, stopCh <-chan struct{}) error { }() // Wait for all servers to shutdown or timeout - shutdownComplete := make(chan struct{}) - go func() { - wg.Wait() - close(shutdownComplete) - }() - select { case <-shutdownComplete: klog.V(1).Infoln("Graceful shutdown completed successfully.") @@ -268,8 +291,6 @@ func (p *Proxy) Run(o *options.ProxyRunOptions, stopCh <-chan struct{}) error { // frontend server's force-stop is handled by its StopFunc } - klog.V(1).Infoln("Server shutdown complete.") - return nil } From e9575bf876cdc3bca2aaf176e14fb5c1ec1e11a7 Mon Sep 17 00:00:00 2001 From: Imran Pochi Date: Tue, 11 Nov 2025 21:29:23 +0000 Subject: [PATCH 4/4] shut down leaseController gracefully if leaseController is enabled and graceful shutdown is enabled as well, without this lease controller will terminate while the servers would continue to shutdown gracefully. Potentially causing a situtaion where the leases will be deleted while the existing connections are still served upto shutdown point. Signed-off-by: Imran Pochi --- cmd/server/app/server.go | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/cmd/server/app/server.go b/cmd/server/app/server.go index bffe9f70e..7f961824b 100644 --- a/cmd/server/app/server.go +++ b/cmd/server/app/server.go @@ -157,8 +157,9 @@ func (p *Proxy) Run(o *options.ProxyRunOptions, stopCh <-chan struct{}) error { return err } + var leaseController *leases.Controller if o.EnableLeaseController { - leaseController := leases.NewController( + leaseController = leases.NewController( k8sClient, o.ServerID, int32(LeaseDuration.Seconds()), @@ -170,7 +171,6 @@ func (p *Proxy) Run(o *options.ProxyRunOptions, stopCh <-chan struct{}) error { ) klog.V(1).Infoln("Starting lease acquisition and garbage collection controller.") leaseController.Run(ctx) - defer leaseController.Stop() } klog.V(1).Infoln("Starting admin server for debug connections.") @@ -204,6 +204,9 @@ func (p *Proxy) Run(o *options.ProxyRunOptions, stopCh <-chan struct{}) error { if p.healthServer != nil { p.healthServer.Close() } + if leaseController != nil { + leaseController.Stop() + } return nil } @@ -291,6 +294,12 @@ func (p *Proxy) Run(o *options.ProxyRunOptions, stopCh <-chan struct{}) error { // frontend server's force-stop is handled by its StopFunc } + // Stop lease controller after servers have shut down + if leaseController != nil { + klog.V(1).Infoln("Stopping lease controller.") + leaseController.Stop() + } + return nil }