diff --git a/cmd/server/app/options/options.go b/cmd/server/app/options/options.go index b213522e5..b112eaffb 100644 --- a/cmd/server/app/options/options.go +++ b/cmd/server/app/options/options.go @@ -115,6 +115,8 @@ type ProxyRunOptions struct { LeaseLabel string // Needs kubernetes client NeedsKubernetesClient bool + // Graceful shutdown timeout duration + GracefulShutdownTimeout time.Duration } func (o *ProxyRunOptions) Flags() *pflag.FlagSet { @@ -155,6 +157,7 @@ func (o *ProxyRunOptions) Flags() *pflag.FlagSet { flags.BoolVar(&o.EnableLeaseController, "enable-lease-controller", o.EnableLeaseController, "Enable lease controller to publish and garbage collect proxy server leases.") flags.StringVar(&o.LeaseNamespace, "lease-namespace", o.LeaseNamespace, "The namespace where lease objects are managed by the controller.") flags.StringVar(&o.LeaseLabel, "lease-label", o.LeaseLabel, "The labels on which the lease objects are managed.") + flags.DurationVar(&o.GracefulShutdownTimeout, "graceful-shutdown-timeout", o.GracefulShutdownTimeout, "Timeout duration for graceful shutdown of the server. The server will wait for active connections to close before forcefully terminating. Set to 0 to disable graceful shutdown (default: 0).") flags.Bool("warn-on-channel-limit", true, "This behavior is now thread safe and always on. This flag will be removed in a future release.") flags.MarkDeprecated("warn-on-channel-limit", "This behavior is now thread safe and always on. This flag will be removed in a future release.") @@ -198,6 +201,7 @@ func (o *ProxyRunOptions) Print() { klog.V(1).Infof("LeaseLabel set to %s.\n", o.LeaseLabel) klog.V(1).Infof("CipherSuites set to %q.\n", o.CipherSuites) klog.V(1).Infof("XfrChannelSize set to %d.\n", o.XfrChannelSize) + klog.V(1).Infof("GracefulShutdownTimeout set to %v.\n", o.GracefulShutdownTimeout) } func (o *ProxyRunOptions) Validate() error { @@ -339,6 +343,11 @@ func (o *ProxyRunOptions) Validate() error { } } + // Validate graceful shutdown timeout + if o.GracefulShutdownTimeout < 0 { + return fmt.Errorf("graceful-shutdown-timeout must be >= 0, got %v", o.GracefulShutdownTimeout) + } + o.NeedsKubernetesClient = usingServiceAccountAuth || o.EnableLeaseController return nil @@ -382,6 +391,7 @@ func NewProxyRunOptions() *ProxyRunOptions { EnableLeaseController: false, LeaseNamespace: "kube-system", LeaseLabel: "k8s-app=konnectivity-server", + GracefulShutdownTimeout: 0, } return &o } diff --git a/cmd/server/app/options/options_test.go b/cmd/server/app/options/options_test.go index 10b9146b5..b66923c2f 100644 --- a/cmd/server/app/options/options_test.go +++ b/cmd/server/app/options/options_test.go @@ -63,6 +63,7 @@ func TestDefaultServerOptions(t *testing.T) { assertDefaultValue(t, "CipherSuites", defaultServerOptions.CipherSuites, make([]string, 0)) assertDefaultValue(t, "XfrChannelSize", defaultServerOptions.XfrChannelSize, 10) assertDefaultValue(t, "APIContentType", defaultServerOptions.APIContentType, "application/vnd.kubernetes.protobuf") + assertDefaultValue(t, "GracefulShutdownTimeout", defaultServerOptions.GracefulShutdownTimeout, 0*time.Second) } @@ -168,6 +169,21 @@ func TestValidate(t *testing.T) { value: -10, expected: fmt.Errorf("channel size -10 must be greater than 0"), }, + "NegativeGracefulShutdownTimeout": { + field: "GracefulShutdownTimeout", + value: -1 * time.Second, + expected: fmt.Errorf("graceful-shutdown-timeout must be >= 0, got -1s"), + }, + "ZeroGracefulShutdownTimeout": { + field: "GracefulShutdownTimeout", + value: 0 * time.Second, + expected: nil, + }, + "PositiveGracefulShutdownTimeout": { + field: "GracefulShutdownTimeout", + value: 30 * time.Second, + expected: nil, + }, } { t.Run(desc, func(t *testing.T) { testServerOptions := NewProxyRunOptions() @@ -184,6 +200,10 @@ func TestValidate(t *testing.T) { case reflect.Int: ivalue := tc.value.(int) fv.SetInt(int64(ivalue)) + case reflect.Int64: + if duration, ok := tc.value.(time.Duration); ok { + fv.SetInt(int64(duration)) + } } } actual := testServerOptions.Validate() diff --git a/cmd/server/app/server.go b/cmd/server/app/server.go index 0e6e8f16c..7f961824b 100644 --- a/cmd/server/app/server.go +++ b/cmd/server/app/server.go @@ -95,7 +95,7 @@ type Proxy struct { server *server.ProxyServer } -type StopFunc func() +type StopFunc func(context.Context) error func (p *Proxy) Run(o *options.ProxyRunOptions, stopCh <-chan struct{}) error { o.Print() @@ -145,24 +145,21 @@ func (p *Proxy) Run(o *options.ProxyRunOptions, stopCh <-chan struct{}) error { if err != nil { return fmt.Errorf("failed to run the frontend server: %v", err) } - if frontendStop != nil { - defer frontendStop() - } klog.V(1).Infoln("Starting agent server for tunnel connections.") err = p.runAgentServer(o, p.server) if err != nil { return fmt.Errorf("failed to run the agent server: %v", err) } - defer p.agentServer.Stop() labels, err := util.ParseLabels(o.LeaseLabel) if err != nil { return err } + var leaseController *leases.Controller if o.EnableLeaseController { - leaseController := leases.NewController( + leaseController = leases.NewController( k8sClient, o.ServerID, int32(LeaseDuration.Seconds()), @@ -174,7 +171,6 @@ func (p *Proxy) Run(o *options.ProxyRunOptions, stopCh <-chan struct{}) error { ) klog.V(1).Infoln("Starting lease acquisition and garbage collection controller.") leaseController.Run(ctx) - defer leaseController.Stop() } klog.V(1).Infoln("Starting admin server for debug connections.") @@ -182,18 +178,128 @@ func (p *Proxy) Run(o *options.ProxyRunOptions, stopCh <-chan struct{}) error { if err != nil { return fmt.Errorf("failed to run the admin server: %v", err) } - defer p.adminServer.Close() klog.V(1).Infoln("Starting health server for healthchecks.") err = p.runHealthServer(o, p.server) if err != nil { return fmt.Errorf("failed to run the health server: %v", err) } - defer p.healthServer.Close() <-stopCh klog.V(1).Infoln("Shutting down server.") + // If graceful shutdown timeout is 0, use the old behavior (immediate shutdown) + if o.GracefulShutdownTimeout == 0 { + if frontendStop != nil { + if err := frontendStop(context.Background()); err != nil { + klog.ErrorS(err, "failed to stop frontend server") + } + } + if p.agentServer != nil { + p.agentServer.Stop() + } + if p.adminServer != nil { + p.adminServer.Close() + } + if p.healthServer != nil { + p.healthServer.Close() + } + if leaseController != nil { + leaseController.Stop() + } + return nil + } + + klog.V(1).Infoln("Initiating graceful shutdown.") + + // Start graceful shutdown with timeout + shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), o.GracefulShutdownTimeout) + defer shutdownCancel() + + // Create a WaitGroup to track shutdown of all components + var wg sync.WaitGroup + + // Add all workers to WaitGroup upfront + if frontendStop != nil { + wg.Add(1) + } + wg.Add(3) // agent, admin, health servers + + // Create completion channel before starting goroutines + shutdownComplete := make(chan struct{}) + go func() { + wg.Wait() + close(shutdownComplete) + }() + + // Shutdown frontend server gracefully (if available) + if frontendStop != nil { + go func() { + defer wg.Done() + klog.V(1).Infoln("Gracefully stopping frontend server...") + if err := frontendStop(shutdownCtx); err != nil { + klog.ErrorS(err, "failed to shut down frontend server") + } else { + klog.V(1).Infoln("frontend server stopped.") + } + }() + } + + // Shutdown agent server gracefully + go func() { + defer wg.Done() + klog.V(1).Infoln("Gracefully stopping agent server...") + p.agentServer.GracefulStop() + klog.V(1).Infoln("agent server stopped.") + }() + + // Shutdown admin server gracefully + go func() { + defer wg.Done() + klog.V(1).Infoln("Gracefully stopping admin server...") + if err := p.adminServer.Shutdown(shutdownCtx); err != nil { + klog.ErrorS(err, "failed to shut down admin server") + } else { + klog.V(1).Infoln("admin server stopped.") + } + }() + + // Shutdown health server gracefully + go func() { + defer wg.Done() + klog.V(1).Infoln("Gracefully stopping health server...") + if err := p.healthServer.Shutdown(shutdownCtx); err != nil { + klog.ErrorS(err, "failed to shut down health server") + } else { + klog.V(1).Infoln("health server stopped.") + } + }() + + // Wait for all servers to shutdown or timeout + select { + case <-shutdownComplete: + klog.V(1).Infoln("Graceful shutdown completed successfully.") + case <-shutdownCtx.Done(): + klog.Warningf("Graceful shutdown timed out after %v, forcing termination.", o.GracefulShutdownTimeout) + // Force stop all servers that might still be running + if p.agentServer != nil { + p.agentServer.Stop() + } + if p.adminServer != nil { + p.adminServer.Close() + } + if p.healthServer != nil { + p.healthServer.Close() + } + // frontend server's force-stop is handled by its StopFunc + } + + // Stop lease controller after servers have shut down + if leaseController != nil { + klog.V(1).Infoln("Stopping lease controller.") + leaseController.Stop() + } + return nil } @@ -260,7 +366,10 @@ func (p *Proxy) runUDSFrontendServer(ctx context.Context, o *options.ProxyRunOpt "udsFile", o.UdsName, ) go runpprof.Do(context.Background(), labels, func(context.Context) { grpcServer.Serve(lis) }) - stop = grpcServer.GracefulStop + stop = func(_ context.Context) error { + grpcServer.GracefulStop() + return nil + } } else { // http-connect server := &http.Server{ @@ -269,9 +378,8 @@ func (p *Proxy) runUDSFrontendServer(ctx context.Context, o *options.ProxyRunOpt Server: s, }, } - stop = func() { - err := server.Shutdown(ctx) - klog.ErrorS(err, "error shutting down server") + stop = func(shutdownCtx context.Context) error { + return server.Shutdown(shutdownCtx) } labels := runpprof.Labels( "core", "udsHttpFrontend", @@ -329,7 +437,7 @@ func (p *Proxy) getTLSConfig(caFile, certFile, keyFile string, cipherSuites []st return tlsConfig, nil } -func (p *Proxy) runMTLSFrontendServer(ctx context.Context, o *options.ProxyRunOptions, s *server.ProxyServer) (StopFunc, error) { +func (p *Proxy) runMTLSFrontendServer(_ context.Context, o *options.ProxyRunOptions, s *server.ProxyServer) (StopFunc, error) { var stop StopFunc var tlsConfig *tls.Config @@ -356,7 +464,10 @@ func (p *Proxy) runMTLSFrontendServer(ctx context.Context, o *options.ProxyRunOp "port", strconv.Itoa(o.ServerPort), ) go runpprof.Do(context.Background(), labels, func(context.Context) { grpcServer.Serve(lis) }) - stop = grpcServer.GracefulStop + stop = func(_ context.Context) error { + grpcServer.GracefulStop() + return nil + } } else { // http-connect server := &http.Server{ @@ -368,11 +479,8 @@ func (p *Proxy) runMTLSFrontendServer(ctx context.Context, o *options.ProxyRunOp }, TLSNextProto: make(map[string]func(*http.Server, *tls.Conn, http.Handler)), } - stop = func() { - err := server.Shutdown(ctx) - if err != nil { - klog.ErrorS(err, "failed to shutdown server") - } + stop = func(shutdownCtx context.Context) error { + return server.Shutdown(shutdownCtx) } labels := runpprof.Labels( "core", "mtlsHttpFrontend",