Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions cmd/server/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,8 @@ type ProxyRunOptions struct {
LeaseLabel string
// Needs kubernetes client
NeedsKubernetesClient bool
// Graceful shutdown timeout duration
GracefulShutdownTimeout time.Duration
}

func (o *ProxyRunOptions) Flags() *pflag.FlagSet {
Expand Down Expand Up @@ -155,6 +157,7 @@ func (o *ProxyRunOptions) Flags() *pflag.FlagSet {
flags.BoolVar(&o.EnableLeaseController, "enable-lease-controller", o.EnableLeaseController, "Enable lease controller to publish and garbage collect proxy server leases.")
flags.StringVar(&o.LeaseNamespace, "lease-namespace", o.LeaseNamespace, "The namespace where lease objects are managed by the controller.")
flags.StringVar(&o.LeaseLabel, "lease-label", o.LeaseLabel, "The labels on which the lease objects are managed.")
flags.DurationVar(&o.GracefulShutdownTimeout, "graceful-shutdown-timeout", o.GracefulShutdownTimeout, "Timeout duration for graceful shutdown of the server. The server will wait for active connections to close before forcefully terminating. Set to 0 to disable graceful shutdown (default: 0).")
flags.Bool("warn-on-channel-limit", true, "This behavior is now thread safe and always on. This flag will be removed in a future release.")
flags.MarkDeprecated("warn-on-channel-limit", "This behavior is now thread safe and always on. This flag will be removed in a future release.")

Expand Down Expand Up @@ -198,6 +201,7 @@ func (o *ProxyRunOptions) Print() {
klog.V(1).Infof("LeaseLabel set to %s.\n", o.LeaseLabel)
klog.V(1).Infof("CipherSuites set to %q.\n", o.CipherSuites)
klog.V(1).Infof("XfrChannelSize set to %d.\n", o.XfrChannelSize)
klog.V(1).Infof("GracefulShutdownTimeout set to %v.\n", o.GracefulShutdownTimeout)
}

func (o *ProxyRunOptions) Validate() error {
Expand Down Expand Up @@ -339,6 +343,11 @@ func (o *ProxyRunOptions) Validate() error {
}
}

// Validate graceful shutdown timeout
if o.GracefulShutdownTimeout < 0 {
return fmt.Errorf("graceful-shutdown-timeout must be >= 0, got %v", o.GracefulShutdownTimeout)
}

o.NeedsKubernetesClient = usingServiceAccountAuth || o.EnableLeaseController

return nil
Expand Down Expand Up @@ -382,6 +391,7 @@ func NewProxyRunOptions() *ProxyRunOptions {
EnableLeaseController: false,
LeaseNamespace: "kube-system",
LeaseLabel: "k8s-app=konnectivity-server",
GracefulShutdownTimeout: 0,
}
return &o
}
Expand Down
20 changes: 20 additions & 0 deletions cmd/server/app/options/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ func TestDefaultServerOptions(t *testing.T) {
assertDefaultValue(t, "CipherSuites", defaultServerOptions.CipherSuites, make([]string, 0))
assertDefaultValue(t, "XfrChannelSize", defaultServerOptions.XfrChannelSize, 10)
assertDefaultValue(t, "APIContentType", defaultServerOptions.APIContentType, "application/vnd.kubernetes.protobuf")
assertDefaultValue(t, "GracefulShutdownTimeout", defaultServerOptions.GracefulShutdownTimeout, 0*time.Second)

}

Expand Down Expand Up @@ -168,6 +169,21 @@ func TestValidate(t *testing.T) {
value: -10,
expected: fmt.Errorf("channel size -10 must be greater than 0"),
},
"NegativeGracefulShutdownTimeout": {
field: "GracefulShutdownTimeout",
value: -1 * time.Second,
expected: fmt.Errorf("graceful-shutdown-timeout must be >= 0, got -1s"),
},
"ZeroGracefulShutdownTimeout": {
field: "GracefulShutdownTimeout",
value: 0 * time.Second,
expected: nil,
},
"PositiveGracefulShutdownTimeout": {
field: "GracefulShutdownTimeout",
value: 30 * time.Second,
expected: nil,
},
} {
t.Run(desc, func(t *testing.T) {
testServerOptions := NewProxyRunOptions()
Expand All @@ -184,6 +200,10 @@ func TestValidate(t *testing.T) {
case reflect.Int:
ivalue := tc.value.(int)
fv.SetInt(int64(ivalue))
case reflect.Int64:
if duration, ok := tc.value.(time.Duration); ok {
fv.SetInt(int64(duration))
}
}
}
actual := testServerOptions.Validate()
Expand Down
148 changes: 128 additions & 20 deletions cmd/server/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ type Proxy struct {
server *server.ProxyServer
}

type StopFunc func()
type StopFunc func(context.Context) error

func (p *Proxy) Run(o *options.ProxyRunOptions, stopCh <-chan struct{}) error {
o.Print()
Expand Down Expand Up @@ -145,24 +145,21 @@ func (p *Proxy) Run(o *options.ProxyRunOptions, stopCh <-chan struct{}) error {
if err != nil {
return fmt.Errorf("failed to run the frontend server: %v", err)
}
if frontendStop != nil {
defer frontendStop()
}

klog.V(1).Infoln("Starting agent server for tunnel connections.")
err = p.runAgentServer(o, p.server)
if err != nil {
return fmt.Errorf("failed to run the agent server: %v", err)
}
defer p.agentServer.Stop()

labels, err := util.ParseLabels(o.LeaseLabel)
if err != nil {
return err
}

var leaseController *leases.Controller
if o.EnableLeaseController {
leaseController := leases.NewController(
leaseController = leases.NewController(
k8sClient,
o.ServerID,
int32(LeaseDuration.Seconds()),
Expand All @@ -174,26 +171,135 @@ func (p *Proxy) Run(o *options.ProxyRunOptions, stopCh <-chan struct{}) error {
)
klog.V(1).Infoln("Starting lease acquisition and garbage collection controller.")
leaseController.Run(ctx)
defer leaseController.Stop()
}

klog.V(1).Infoln("Starting admin server for debug connections.")
err = p.runAdminServer(o, p.server)
if err != nil {
return fmt.Errorf("failed to run the admin server: %v", err)
}
defer p.adminServer.Close()

klog.V(1).Infoln("Starting health server for healthchecks.")
err = p.runHealthServer(o, p.server)
if err != nil {
return fmt.Errorf("failed to run the health server: %v", err)
}
defer p.healthServer.Close()

<-stopCh
klog.V(1).Infoln("Shutting down server.")

// If graceful shutdown timeout is 0, use the old behavior (immediate shutdown)
if o.GracefulShutdownTimeout == 0 {
if frontendStop != nil {
if err := frontendStop(context.Background()); err != nil {
klog.ErrorS(err, "failed to stop frontend server")
}
}
if p.agentServer != nil {
p.agentServer.Stop()
}
if p.adminServer != nil {
p.adminServer.Close()
}
if p.healthServer != nil {
p.healthServer.Close()
}
if leaseController != nil {
leaseController.Stop()
}
return nil
}

klog.V(1).Infoln("Initiating graceful shutdown.")

// Start graceful shutdown with timeout
shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), o.GracefulShutdownTimeout)
defer shutdownCancel()

// Create a WaitGroup to track shutdown of all components
var wg sync.WaitGroup

// Add all workers to WaitGroup upfront
if frontendStop != nil {
wg.Add(1)
}
wg.Add(3) // agent, admin, health servers

// Create completion channel before starting goroutines
shutdownComplete := make(chan struct{})
go func() {
wg.Wait()
close(shutdownComplete)
}()

// Shutdown frontend server gracefully (if available)
if frontendStop != nil {
go func() {
defer wg.Done()
klog.V(1).Infoln("Gracefully stopping frontend server...")
if err := frontendStop(shutdownCtx); err != nil {
klog.ErrorS(err, "failed to shut down frontend server")
} else {
klog.V(1).Infoln("frontend server stopped.")
}
}()
}

// Shutdown agent server gracefully
go func() {
defer wg.Done()
klog.V(1).Infoln("Gracefully stopping agent server...")
p.agentServer.GracefulStop()
klog.V(1).Infoln("agent server stopped.")
}()

// Shutdown admin server gracefully
go func() {
defer wg.Done()
klog.V(1).Infoln("Gracefully stopping admin server...")
if err := p.adminServer.Shutdown(shutdownCtx); err != nil {
klog.ErrorS(err, "failed to shut down admin server")
} else {
klog.V(1).Infoln("admin server stopped.")
}
}()

// Shutdown health server gracefully
go func() {
defer wg.Done()
klog.V(1).Infoln("Gracefully stopping health server...")
if err := p.healthServer.Shutdown(shutdownCtx); err != nil {
klog.ErrorS(err, "failed to shut down health server")
} else {
klog.V(1).Infoln("health server stopped.")
}
}()

// Wait for all servers to shutdown or timeout
select {
case <-shutdownComplete:
klog.V(1).Infoln("Graceful shutdown completed successfully.")
case <-shutdownCtx.Done():
klog.Warningf("Graceful shutdown timed out after %v, forcing termination.", o.GracefulShutdownTimeout)
// Force stop all servers that might still be running
if p.agentServer != nil {
p.agentServer.Stop()
}
if p.adminServer != nil {
p.adminServer.Close()
}
if p.healthServer != nil {
p.healthServer.Close()
}
// frontend server's force-stop is handled by its StopFunc
}

// Stop lease controller after servers have shut down
if leaseController != nil {
klog.V(1).Infoln("Stopping lease controller.")
leaseController.Stop()
}

return nil
}

Expand Down Expand Up @@ -260,7 +366,10 @@ func (p *Proxy) runUDSFrontendServer(ctx context.Context, o *options.ProxyRunOpt
"udsFile", o.UdsName,
)
go runpprof.Do(context.Background(), labels, func(context.Context) { grpcServer.Serve(lis) })
stop = grpcServer.GracefulStop
stop = func(_ context.Context) error {
grpcServer.GracefulStop()
return nil
}
} else {
// http-connect
server := &http.Server{
Expand All @@ -269,9 +378,8 @@ func (p *Proxy) runUDSFrontendServer(ctx context.Context, o *options.ProxyRunOpt
Server: s,
},
}
stop = func() {
err := server.Shutdown(ctx)
klog.ErrorS(err, "error shutting down server")
stop = func(shutdownCtx context.Context) error {
return server.Shutdown(shutdownCtx)
}
labels := runpprof.Labels(
"core", "udsHttpFrontend",
Expand Down Expand Up @@ -329,7 +437,7 @@ func (p *Proxy) getTLSConfig(caFile, certFile, keyFile string, cipherSuites []st
return tlsConfig, nil
}

func (p *Proxy) runMTLSFrontendServer(ctx context.Context, o *options.ProxyRunOptions, s *server.ProxyServer) (StopFunc, error) {
func (p *Proxy) runMTLSFrontendServer(_ context.Context, o *options.ProxyRunOptions, s *server.ProxyServer) (StopFunc, error) {
var stop StopFunc

var tlsConfig *tls.Config
Expand All @@ -356,7 +464,10 @@ func (p *Proxy) runMTLSFrontendServer(ctx context.Context, o *options.ProxyRunOp
"port", strconv.Itoa(o.ServerPort),
)
go runpprof.Do(context.Background(), labels, func(context.Context) { grpcServer.Serve(lis) })
stop = grpcServer.GracefulStop
stop = func(_ context.Context) error {
grpcServer.GracefulStop()
return nil
}
} else {
// http-connect
server := &http.Server{
Expand All @@ -368,11 +479,8 @@ func (p *Proxy) runMTLSFrontendServer(ctx context.Context, o *options.ProxyRunOp
},
TLSNextProto: make(map[string]func(*http.Server, *tls.Conn, http.Handler)),
}
stop = func() {
err := server.Shutdown(ctx)
if err != nil {
klog.ErrorS(err, "failed to shutdown server")
}
stop = func(shutdownCtx context.Context) error {
return server.Shutdown(shutdownCtx)
}
labels := runpprof.Labels(
"core", "mtlsHttpFrontend",
Expand Down