Skip to content

Commit c14cffa

Browse files
committed
implements graceful termination for server
Signed-off-by: Imran Pochi <imranpochi@microsoft.com>
1 parent ae3b7c4 commit c14cffa

File tree

2 files changed

+101
-18
lines changed

2 files changed

+101
-18
lines changed

cmd/server/app/options/options.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,8 @@ type ProxyRunOptions struct {
114114
LeaseLabel string
115115
// Needs kubernetes client
116116
NeedsKubernetesClient bool
117+
// Graceful shutdown timeout duration
118+
GracefulShutdownTimeout time.Duration
117119
}
118120

119121
func (o *ProxyRunOptions) Flags() *pflag.FlagSet {
@@ -154,6 +156,7 @@ func (o *ProxyRunOptions) Flags() *pflag.FlagSet {
154156
flags.BoolVar(&o.EnableLeaseController, "enable-lease-controller", o.EnableLeaseController, "Enable lease controller to publish and garbage collect proxy server leases.")
155157
flags.StringVar(&o.LeaseNamespace, "lease-namespace", o.LeaseNamespace, "The namespace where lease objects are managed by the controller.")
156158
flags.StringVar(&o.LeaseLabel, "lease-label", o.LeaseLabel, "The labels on which the lease objects are managed.")
159+
flags.DurationVar(&o.GracefulShutdownTimeout, "graceful-shutdown-timeout", o.GracefulShutdownTimeout, "Timeout duration for graceful shutdown of the server. The server will wait for active connections to close before forcefully terminating.")
157160
flags.Bool("warn-on-channel-limit", true, "This behavior is now thread safe and always on. This flag will be removed in a future release.")
158161
flags.MarkDeprecated("warn-on-channel-limit", "This behavior is now thread safe and always on. This flag will be removed in a future release.")
159162

@@ -197,6 +200,7 @@ func (o *ProxyRunOptions) Print() {
197200
klog.V(1).Infof("LeaseLabel set to %s.\n", o.LeaseLabel)
198201
klog.V(1).Infof("CipherSuites set to %q.\n", o.CipherSuites)
199202
klog.V(1).Infof("XfrChannelSize set to %d.\n", o.XfrChannelSize)
203+
klog.V(1).Infof("GracefulShutdownTimeout set to %v.\n", o.GracefulShutdownTimeout)
200204
}
201205

202206
func (o *ProxyRunOptions) Validate() error {
@@ -381,6 +385,7 @@ func NewProxyRunOptions() *ProxyRunOptions {
381385
EnableLeaseController: false,
382386
LeaseNamespace: "kube-system",
383387
LeaseLabel: "k8s-app=konnectivity-server",
388+
GracefulShutdownTimeout: 15 * time.Second,
384389
}
385390
return &o
386391
}

cmd/server/app/server.go

Lines changed: 96 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ type Proxy struct {
9494
server *server.ProxyServer
9595
}
9696

97-
type StopFunc func()
97+
type StopFunc func(context.Context) error
9898

9999
func (p *Proxy) Run(o *options.ProxyRunOptions, stopCh <-chan struct{}) error {
100100
o.Print()
@@ -144,16 +144,12 @@ func (p *Proxy) Run(o *options.ProxyRunOptions, stopCh <-chan struct{}) error {
144144
if err != nil {
145145
return fmt.Errorf("failed to run the frontend server: %v", err)
146146
}
147-
if frontendStop != nil {
148-
defer frontendStop()
149-
}
150147

151148
klog.V(1).Infoln("Starting agent server for tunnel connections.")
152149
err = p.runAgentServer(o, p.server)
153150
if err != nil {
154151
return fmt.Errorf("failed to run the agent server: %v", err)
155152
}
156-
defer p.agentServer.Stop()
157153

158154
labels, err := util.ParseLabels(o.LeaseLabel)
159155
if err != nil {
@@ -181,17 +177,97 @@ func (p *Proxy) Run(o *options.ProxyRunOptions, stopCh <-chan struct{}) error {
181177
if err != nil {
182178
return fmt.Errorf("failed to run the admin server: %v", err)
183179
}
184-
defer p.adminServer.Close()
185180

186181
klog.V(1).Infoln("Starting health server for healthchecks.")
187182
err = p.runHealthServer(o, p.server)
188183
if err != nil {
189184
return fmt.Errorf("failed to run the health server: %v", err)
190185
}
191-
defer p.healthServer.Close()
192186

193187
<-stopCh
194-
klog.V(1).Infoln("Shutting down server.")
188+
klog.V(1).Infoln("Received shutdown signal, initiating graceful shutdown.")
189+
190+
// Start graceful shutdown with timeout
191+
shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), o.GracefulShutdownTimeout)
192+
defer shutdownCancel()
193+
194+
// Create a WaitGroup to track shutdown of all components
195+
var wg sync.WaitGroup
196+
197+
// Shutdown frontend server gracefully (if available)
198+
if frontendStop != nil {
199+
wg.Add(1)
200+
go func() {
201+
defer wg.Done()
202+
klog.V(1).Infoln("Gracefully stopping frontend server...")
203+
if err := frontendStop(shutdownCtx); err != nil {
204+
klog.ErrorS(err, "failed to shut down frontend server")
205+
} else {
206+
klog.V(1).Infoln("frontend server stopped.")
207+
}
208+
}()
209+
}
210+
211+
// Shutdown agent server gracefully
212+
wg.Add(1)
213+
go func() {
214+
defer wg.Done()
215+
klog.V(1).Infoln("Gracefully stopping agent server...")
216+
p.agentServer.GracefulStop()
217+
klog.V(1).Infoln("agent server stopped.")
218+
}()
219+
220+
// Shutdown admin server gracefully
221+
wg.Add(1)
222+
go func() {
223+
defer wg.Done()
224+
klog.V(1).Infoln("Gracefully stopping admin server...")
225+
if err := p.adminServer.Shutdown(shutdownCtx); err != nil {
226+
klog.ErrorS(err, "failed to shut down admin server")
227+
228+
} else {
229+
klog.V(1).Infoln("admin server stopped.")
230+
}
231+
}()
232+
233+
// Shutdown health server gracefully
234+
wg.Add(1)
235+
go func() {
236+
defer wg.Done()
237+
klog.V(1).Infoln("Gracefully stopping health server...")
238+
if err := p.healthServer.Shutdown(shutdownCtx); err != nil {
239+
klog.ErrorS(err, "failed to shut down health server")
240+
} else {
241+
klog.V(1).Infoln("health server stopped.")
242+
}
243+
}()
244+
245+
// Wait for all servers to shutdown or timeout
246+
shutdownComplete := make(chan struct{})
247+
go func() {
248+
wg.Wait()
249+
close(shutdownComplete)
250+
}()
251+
252+
select {
253+
case <-shutdownComplete:
254+
klog.V(1).Infoln("Graceful shutdown completed successfully.")
255+
case <-shutdownCtx.Done():
256+
klog.Warningf("Graceful shutdown timed out after %v, forcing termination.", o.GracefulShutdownTimeout)
257+
// Force stop all servers that might still be running
258+
if p.agentServer != nil {
259+
p.agentServer.Stop()
260+
}
261+
if p.adminServer != nil {
262+
p.adminServer.Close()
263+
}
264+
if p.healthServer != nil {
265+
p.healthServer.Close()
266+
}
267+
// frontend server's force-stop is handled by its StopFunc
268+
}
269+
270+
klog.V(1).Infoln("Server shutdown complete.")
195271

196272
return nil
197273
}
@@ -259,7 +335,10 @@ func (p *Proxy) runUDSFrontendServer(ctx context.Context, o *options.ProxyRunOpt
259335
"udsFile", o.UdsName,
260336
)
261337
go runpprof.Do(context.Background(), labels, func(context.Context) { grpcServer.Serve(lis) })
262-
stop = grpcServer.GracefulStop
338+
stop = func(ctx context.Context) error {
339+
grpcServer.GracefulStop()
340+
return nil
341+
}
263342
} else {
264343
// http-connect
265344
server := &http.Server{
@@ -268,9 +347,8 @@ func (p *Proxy) runUDSFrontendServer(ctx context.Context, o *options.ProxyRunOpt
268347
Server: s,
269348
},
270349
}
271-
stop = func() {
272-
err := server.Shutdown(ctx)
273-
klog.ErrorS(err, "error shutting down server")
350+
stop = func(shutdownCtx context.Context) error {
351+
return server.Shutdown(shutdownCtx)
274352
}
275353
labels := runpprof.Labels(
276354
"core", "udsHttpFrontend",
@@ -355,7 +433,10 @@ func (p *Proxy) runMTLSFrontendServer(ctx context.Context, o *options.ProxyRunOp
355433
"port", strconv.Itoa(o.ServerPort),
356434
)
357435
go runpprof.Do(context.Background(), labels, func(context.Context) { grpcServer.Serve(lis) })
358-
stop = grpcServer.GracefulStop
436+
stop = func(ctx context.Context) error {
437+
grpcServer.GracefulStop()
438+
return nil
439+
}
359440
} else {
360441
// http-connect
361442
server := &http.Server{
@@ -367,11 +448,8 @@ func (p *Proxy) runMTLSFrontendServer(ctx context.Context, o *options.ProxyRunOp
367448
},
368449
TLSNextProto: make(map[string]func(*http.Server, *tls.Conn, http.Handler)),
369450
}
370-
stop = func() {
371-
err := server.Shutdown(ctx)
372-
if err != nil {
373-
klog.ErrorS(err, "failed to shutdown server")
374-
}
451+
stop = func(shutdownCtx context.Context) error {
452+
return server.Shutdown(shutdownCtx)
375453
}
376454
labels := runpprof.Labels(
377455
"core", "mtlsHttpFrontend",

0 commit comments

Comments
 (0)