Skip to content

Commit 3dc3c4f

Browse files
committed
OPTIM/MEDIUM: Update Ingress status in a seperate goroutine
Updating status in Ingress resources is subject to delays, network issues, etc. Doing it in the main IC goroutine may halter IC execution and impact controller performance as reported here in this issue: #312
1 parent de4ccd8 commit 3dc3c4f

File tree

5 files changed

+174
-144
lines changed

5 files changed

+174
-144
lines changed

controller/controller.go

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525
"github.com/haproxytech/kubernetes-ingress/controller/haproxy"
2626
"github.com/haproxytech/kubernetes-ingress/controller/haproxy/api"
2727
"github.com/haproxytech/kubernetes-ingress/controller/route"
28+
"github.com/haproxytech/kubernetes-ingress/controller/status"
2829
"github.com/haproxytech/kubernetes-ingress/controller/store"
2930
"github.com/haproxytech/kubernetes-ingress/controller/utils"
3031
)
@@ -35,9 +36,10 @@ type HAProxyController struct {
3536
Client api.HAProxyClient
3637
OSArgs utils.OSArgs
3738
Store store.K8s
38-
PublishService *store.Service
39+
PublishService *utils.NamespaceValue
3940
AuxCfgModTime int64
4041
eventChan chan SyncDataEvent
42+
statusChan chan status.SyncIngress
4143
k8s *K8s
4244
ready bool
4345
reload bool
@@ -83,11 +85,9 @@ func (c *HAProxyController) Start() {
8385
// Controller PublishService
8486
parts := strings.Split(c.OSArgs.PublishService, "/")
8587
if len(parts) == 2 {
86-
c.PublishService = &store.Service{
88+
c.PublishService = &utils.NamespaceValue{
8789
Namespace: parts[0],
8890
Name: parts[1],
89-
Status: EMPTY,
90-
Addresses: []string{},
9191
}
9292
}
9393

@@ -113,6 +113,11 @@ func (c *HAProxyController) Start() {
113113
// Monitor k8s events
114114
c.eventChan = make(chan SyncDataEvent, watch.DefaultChanSize*6)
115115
go c.monitorChanges()
116+
if c.PublishService != nil {
117+
// Update Ingress status
118+
c.statusChan = make(chan status.SyncIngress, watch.DefaultChanSize*6)
119+
go status.UpdateIngress(c.k8s.API, c.Store, c.statusChan)
120+
}
116121
}
117122

118123
// Stop handles shutting down HAProxyController
@@ -156,8 +161,12 @@ func (c *HAProxyController) updateHAProxy() {
156161
logger.Debugf("ingress '%s/%s' ignored: no matching IngressClass", ingress.Namespace, ingress.Name)
157162
continue
158163
}
159-
if c.PublishService != nil {
160-
logger.Error(c.k8s.UpdateIngressStatus(ingress, c.PublishService))
164+
if c.PublishService != nil && ingress.Status == ADDED {
165+
select {
166+
case c.statusChan <- status.SyncIngress{Ingress: ingress}:
167+
default:
168+
logger.Errorf("Ingress %s/%s: unable to sync status: sync channel full", ingress.Namespace, ingress.Name)
169+
}
161170
}
162171
if ingress.DefaultBackend != nil {
163172
if reload, err = c.setDefaultService(ingress, []string{c.Cfg.FrontHTTP, c.Cfg.FrontHTTPS}); err != nil {
@@ -271,9 +280,6 @@ func (c *HAProxyController) setToReady() {
271280
// clean controller state
272281
func (c *HAProxyController) clean(failedSync bool) {
273282
logger.Error(c.Cfg.Clean())
274-
if c.PublishService != nil {
275-
c.PublishService.Status = EMPTY
276-
}
277283
c.Cfg.SSLPassthrough = false
278284
if !failedSync {
279285
c.Store.Clean()

controller/kubernetes.go

Lines changed: 11 additions & 134 deletions
Original file line numberDiff line numberDiff line change
@@ -15,22 +15,15 @@
1515
package controller
1616

1717
import (
18-
"context"
1918
"errors"
20-
"fmt"
21-
"net"
2219

2320
corev1 "k8s.io/api/core/v1"
24-
extensionsv1beta1 "k8s.io/api/extensions/v1beta1"
25-
networkingv1 "k8s.io/api/networking/v1"
26-
networkingv1beta "k8s.io/api/networking/v1beta1"
27-
k8serror "k8s.io/apimachinery/pkg/api/errors"
28-
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2921
"k8s.io/client-go/kubernetes"
3022
"k8s.io/client-go/rest"
3123
"k8s.io/client-go/tools/cache"
3224
"k8s.io/client-go/tools/clientcmd"
3325

26+
ingstatus "github.com/haproxytech/kubernetes-ingress/controller/status"
3427
"github.com/haproxytech/kubernetes-ingress/controller/store"
3528
"github.com/haproxytech/kubernetes-ingress/controller/utils"
3629
)
@@ -343,7 +336,7 @@ func (k *K8s) EventsIngresses(channel chan SyncDataEvent, stop chan struct{}, in
343336
go informer.Run(stop)
344337
}
345338

346-
func (k *K8s) EventsServices(channel chan SyncDataEvent, stop chan struct{}, informer cache.SharedIndexInformer, publishSvc *store.Service) {
339+
func (k *K8s) EventsServices(channel chan SyncDataEvent, ingChan chan ingstatus.SyncIngress, stop chan struct{}, informer cache.SharedIndexInformer, publishSvc *utils.NamespaceValue) {
347340
informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
348341
AddFunc: func(obj interface{}) {
349342
data, ok := obj.(*corev1.Service)
@@ -378,13 +371,11 @@ func (k *K8s) EventsServices(channel chan SyncDataEvent, stop chan struct{}, inf
378371
Port: int64(sp.Port),
379372
})
380373
}
381-
if publishSvc != nil {
382-
if publishSvc.Namespace == item.Namespace && publishSvc.Name == item.Name {
383-
k.GetPublishServiceAddresses(data, publishSvc)
384-
}
385-
}
386374
k.Logger.Tracef("%s %s: %s", SERVICE, item.Status, item.Name)
387375
channel <- SyncDataEvent{SyncType: SERVICE, Namespace: item.Namespace, Data: item}
376+
if publishSvc != nil && publishSvc.Namespace == data.Namespace && publishSvc.Name == data.Name {
377+
ingChan <- ingstatus.SyncIngress{Service: data}
378+
}
388379
},
389380
DeleteFunc: func(obj interface{}) {
390381
data, ok := obj.(*corev1.Service)
@@ -406,13 +397,11 @@ func (k *K8s) EventsServices(channel chan SyncDataEvent, stop chan struct{}, inf
406397
if data.Spec.Type == corev1.ServiceTypeExternalName {
407398
item.DNS = data.Spec.ExternalName
408399
}
409-
if publishSvc != nil {
410-
if publishSvc.Namespace == item.Namespace && publishSvc.Name == item.Name {
411-
publishSvc.Status = DELETED
412-
}
413-
}
414400
k.Logger.Tracef("%s %s: %s", SERVICE, item.Status, item.Name)
415401
channel <- SyncDataEvent{SyncType: SERVICE, Namespace: item.Namespace, Data: item}
402+
if publishSvc != nil && publishSvc.Namespace == data.Namespace && publishSvc.Name == data.Name {
403+
ingChan <- ingstatus.SyncIngress{Service: data}
404+
}
416405
},
417406
UpdateFunc: func(oldObj, newObj interface{}) {
418407
data1, ok := oldObj.(*corev1.Service)
@@ -433,6 +422,9 @@ func (k *K8s) EventsServices(channel chan SyncDataEvent, stop chan struct{}, inf
433422
k.Logger.Tracef("forwarding to ExternalName Services for %v is disabled", data2)
434423
return
435424
}
425+
if publishSvc != nil && publishSvc.Namespace == data2.Namespace && publishSvc.Name == data2.Name {
426+
ingChan <- ingstatus.SyncIngress{Service: data2}
427+
}
436428
status := MODIFIED
437429
item1 := &store.Service{
438430
Namespace: data1.GetNamespace(),
@@ -474,11 +466,6 @@ func (k *K8s) EventsServices(channel chan SyncDataEvent, stop chan struct{}, inf
474466
if item2.Equal(item1) {
475467
return
476468
}
477-
if publishSvc != nil {
478-
if publishSvc.Namespace == item2.Namespace && publishSvc.Name == item2.Name {
479-
k.GetPublishServiceAddresses(data2, publishSvc)
480-
}
481-
}
482469
k.Logger.Tracef("%s %s: %s", SERVICE, item2.Status, item2.Name)
483470
channel <- SyncDataEvent{SyncType: SERVICE, Namespace: item2.Namespace, Data: item2}
484471
},
@@ -634,116 +621,6 @@ func (k *K8s) EventsSecrets(channel chan SyncDataEvent, stop chan struct{}, info
634621
go informer.Run(stop)
635622
}
636623

637-
func (k *K8s) UpdateIngressStatus(ingress *store.Ingress, publishSvc *store.Service) (err error) {
638-
var status store.Status
639-
640-
if status = publishSvc.Status; status == EMPTY {
641-
if ingress.Status == EMPTY {
642-
return nil
643-
}
644-
status = ingress.Status
645-
}
646-
647-
var lbi []corev1.LoadBalancerIngress
648-
649-
// Update addresses
650-
if status == ADDED || status == MODIFIED {
651-
for _, addr := range publishSvc.Addresses {
652-
if net.ParseIP(addr) == nil {
653-
lbi = append(lbi, corev1.LoadBalancerIngress{Hostname: addr})
654-
} else {
655-
lbi = append(lbi, corev1.LoadBalancerIngress{IP: addr})
656-
}
657-
}
658-
}
659-
660-
switch ingress.APIVersion {
661-
// Required for Kubernetes < 1.14
662-
case "extensions/v1beta1":
663-
var ingSource *extensionsv1beta1.Ingress
664-
ingSource, err = k.API.ExtensionsV1beta1().Ingresses(ingress.Namespace).Get(context.Background(), ingress.Name, metav1.GetOptions{})
665-
if err != nil {
666-
break
667-
}
668-
ingCopy := ingSource.DeepCopy()
669-
ingCopy.Status = extensionsv1beta1.IngressStatus{LoadBalancer: corev1.LoadBalancerStatus{Ingress: lbi}}
670-
_, err = k.API.ExtensionsV1beta1().Ingresses(ingress.Namespace).UpdateStatus(context.Background(), ingCopy, metav1.UpdateOptions{})
671-
// Required for Kubernetes < 1.19
672-
case "networking.k8s.io/v1beta1":
673-
var ingSource *networkingv1beta.Ingress
674-
ingSource, err = k.API.NetworkingV1beta1().Ingresses(ingress.Namespace).Get(context.Background(), ingress.Name, metav1.GetOptions{})
675-
if err != nil {
676-
break
677-
}
678-
ingCopy := ingSource.DeepCopy()
679-
ingCopy.Status = networkingv1beta.IngressStatus{LoadBalancer: corev1.LoadBalancerStatus{Ingress: lbi}}
680-
_, err = k.API.NetworkingV1beta1().Ingresses(ingress.Namespace).UpdateStatus(context.Background(), ingCopy, metav1.UpdateOptions{})
681-
case "networking.k8s.io/v1":
682-
var ingSource *networkingv1.Ingress
683-
ingSource, err = k.API.NetworkingV1().Ingresses(ingress.Namespace).Get(context.Background(), ingress.Name, metav1.GetOptions{})
684-
if err != nil {
685-
break
686-
}
687-
ingCopy := ingSource.DeepCopy()
688-
ingCopy.Status = networkingv1.IngressStatus{LoadBalancer: corev1.LoadBalancerStatus{Ingress: lbi}}
689-
_, err = k.API.NetworkingV1().Ingresses(ingress.Namespace).UpdateStatus(context.Background(), ingCopy, metav1.UpdateOptions{})
690-
}
691-
692-
if k8serror.IsNotFound(err) {
693-
return fmt.Errorf("update ingress status: failed to get ingress %s/%s: %w", ingress.Namespace, ingress.Name, err)
694-
}
695-
if err != nil {
696-
return fmt.Errorf("failed to update LoadBalancer status of ingress %s/%s: %w", ingress.Namespace, ingress.Name, err)
697-
}
698-
k.Logger.Debugf("Successful update of LoadBalancer status in ingress %s/%s", ingress.Namespace, ingress.Name)
699-
700-
return nil
701-
}
702-
703-
func (k *K8s) GetPublishServiceAddresses(service *corev1.Service, publishSvc *store.Service) {
704-
addresses := []string{}
705-
switch service.Spec.Type {
706-
case corev1.ServiceTypeExternalName:
707-
addresses = []string{service.Spec.ExternalName}
708-
case corev1.ServiceTypeClusterIP:
709-
addresses = []string{service.Spec.ClusterIP}
710-
case corev1.ServiceTypeNodePort:
711-
if service.Spec.ExternalIPs != nil {
712-
addresses = append(addresses, service.Spec.ExternalIPs...)
713-
} else {
714-
addresses = append(addresses, service.Spec.ClusterIP)
715-
}
716-
case corev1.ServiceTypeLoadBalancer:
717-
for _, ip := range service.Status.LoadBalancer.Ingress {
718-
if ip.IP == "" {
719-
addresses = append(addresses, ip.Hostname)
720-
} else {
721-
addresses = append(addresses, ip.IP)
722-
}
723-
}
724-
addresses = append(addresses, service.Spec.ExternalIPs...)
725-
default:
726-
k.Logger.Tracef("Unable to extract IP address/es from service %v", service)
727-
return
728-
}
729-
730-
equal := false
731-
if len(publishSvc.Addresses) == len(addresses) {
732-
equal = true
733-
for i, address := range publishSvc.Addresses {
734-
if address != publishSvc.Addresses[i] {
735-
equal = false
736-
break
737-
}
738-
}
739-
}
740-
if equal {
741-
return
742-
}
743-
publishSvc.Addresses = addresses
744-
publishSvc.Status = MODIFIED
745-
}
746-
747624
func (k *K8s) IsNetworkingV1Beta1ApiSupported() bool {
748625
vi, _ := k.API.Discovery().ServerVersion()
749626
major, _ := utils.ParseInt(vi.Major)

controller/monitor.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ func (c *HAProxyController) monitorChanges() {
4747
c.k8s.EventsEndpoints(c.eventChan, stop, pi)
4848

4949
svci := factory.Core().V1().Services().Informer()
50-
c.k8s.EventsServices(c.eventChan, stop, svci, c.PublishService)
50+
c.k8s.EventsServices(c.eventChan, c.statusChan, stop, svci, c.PublishService)
5151

5252
nsi := factory.Core().V1().Namespaces().Informer()
5353
c.k8s.EventsNamespaces(c.eventChan, stop, nsi)

0 commit comments

Comments
 (0)