Skip to content

Commit ce92599

Browse files
authored
Merge pull request kmesh-net#1021 from YaoZengzeng/workload-waypoint
handle hostname type waypoint in workload
2 parents 5b6fb11 + 279951e commit ce92599

File tree

6 files changed

+618
-248
lines changed

6 files changed

+618
-248
lines changed

pkg/controller/workload/cache/service_cache.go

Lines changed: 1 addition & 167 deletions
Original file line numberDiff line numberDiff line change
@@ -23,104 +23,24 @@ import (
2323
"kmesh.net/kmesh/pkg/logger"
2424
)
2525

26-
var log = logger.NewLoggerScope("service_cache")
26+
var log = logger.NewLoggerScope("cache")
2727

2828
type ServiceCache interface {
2929
List() []*workloadapi.Service
3030
AddOrUpdateService(svc *workloadapi.Service)
3131
DeleteService(resourceName string)
3232
GetService(resourceName string) *workloadapi.Service
33-
RefreshWaypoint(svc *workloadapi.Service) []*workloadapi.Service
3433
}
3534

3635
type serviceCache struct {
3736
mutex sync.RWMutex
3837
// keyed by namespace/hostname->service
3938
servicesByResourceName map[string]*workloadapi.Service
40-
41-
// NOTE: The following data structure is used to change the waypoint
42-
// address of type hostname in the service to type ip address. Because of
43-
// the order in which services are processed, it may be possible that corresponding
44-
// waypoint service can't be found when processing the service. The waypoint associated
45-
// with a service can also changed at any time, so we need the following maps to track
46-
// the relationship between service and its waypoint.
47-
48-
// Used to track a waypoint and all services associated with it.
49-
// Keyed by waypoint service resource name, valued by its associated services.
50-
//
51-
// ***
52-
// When a service's waypoint needs to be converted, first check whether the waypoint can be found in this map.
53-
// If it can be found, convert it directly. Otherwise, add it to the waypointAssociatedServices and wait.
54-
// When the corresponding waypoint service is added to the cache, it will be processed and returned uniformly.
55-
// ***
56-
waypointToServices map[string]*waypointAssociatedServices
57-
58-
// Used to locate relevant waypoint when deleting or updating service.
59-
// Keyed by service resource name, valued by associated waypoint's resource name.
60-
serviceToWaypoint map[string]string
61-
}
62-
63-
type waypointAssociatedServices struct {
64-
mutex sync.RWMutex
65-
// IP address of waypoint.
66-
// If it is nil, it means that the waypoint service has not been processed yet.
67-
address *workloadapi.NetworkAddress
68-
69-
// Associated services of this waypoint.
70-
// The key of this map is service resource name and value is corresponding service structure.
71-
services map[string]*workloadapi.Service
72-
}
73-
74-
func newWaypointAssociatedServices(addr *workloadapi.NetworkAddress) *waypointAssociatedServices {
75-
return &waypointAssociatedServices{
76-
address: addr,
77-
services: make(map[string]*workloadapi.Service),
78-
}
79-
}
80-
81-
func (w *waypointAssociatedServices) isResolved() bool {
82-
return w.address != nil
83-
}
84-
85-
func (w *waypointAssociatedServices) waypointAddress() *workloadapi.NetworkAddress {
86-
return w.address
87-
}
88-
89-
func (w *waypointAssociatedServices) update(addr *workloadapi.NetworkAddress) []*workloadapi.Service {
90-
w.mutex.Lock()
91-
defer w.mutex.Unlock()
92-
93-
w.address = addr
94-
95-
res := []*workloadapi.Service{}
96-
97-
for _, svc := range w.services {
98-
updateWaypoint(svc, addr)
99-
res = append(res, svc)
100-
}
101-
102-
return res
103-
}
104-
105-
func (w *waypointAssociatedServices) deleteService(resourceName string) {
106-
w.mutex.Lock()
107-
defer w.mutex.Unlock()
108-
109-
delete(w.services, resourceName)
110-
}
111-
112-
func (w *waypointAssociatedServices) addService(resourceName string, service *workloadapi.Service) {
113-
w.mutex.Lock()
114-
defer w.mutex.Unlock()
115-
116-
w.services[resourceName] = service
11739
}
11840

11941
func NewServiceCache() *serviceCache {
12042
return &serviceCache{
12143
servicesByResourceName: make(map[string]*workloadapi.Service),
122-
waypointToServices: make(map[string]*waypointAssociatedServices),
123-
serviceToWaypoint: make(map[string]string),
12444
}
12545
}
12646

@@ -130,62 +50,12 @@ func (s *serviceCache) AddOrUpdateService(svc *workloadapi.Service) {
13050
resourceName := svc.ResourceName()
13151

13252
s.servicesByResourceName[resourceName] = svc
133-
134-
// If this is a service with an IP address type waypoint, no processing is required and
135-
// return directly.
136-
if svc.GetWaypoint() == nil || svc.GetWaypoint().GetAddress() != nil {
137-
// Service may become unassociated with waypoint.
138-
if waypoint, ok := s.serviceToWaypoint[resourceName]; ok {
139-
delete(s.serviceToWaypoint, resourceName)
140-
s.waypointToServices[waypoint].deleteService(resourceName)
141-
}
142-
return
143-
}
144-
145-
// If this is a svc with hostname waypoint.
146-
hostname := svc.GetWaypoint().GetHostname()
147-
waypointResourceName := hostname.GetNamespace() + "/" + hostname.GetHostname()
148-
149-
if waypoint, ok := s.serviceToWaypoint[resourceName]; ok && waypoint != waypointResourceName {
150-
// Service updated associated waypoint, delete previous association first.
151-
delete(s.serviceToWaypoint, resourceName)
152-
s.waypointToServices[waypoint].deleteService(resourceName)
153-
}
154-
155-
log.Debugf("Update svc %s with waypoint %s", svc.ResourceName(), waypointResourceName)
156-
if associated, ok := s.waypointToServices[waypointResourceName]; ok {
157-
if associated.isResolved() {
158-
// The waypoint corresponding to this service has been resolved.
159-
updateWaypoint(svc, associated.waypointAddress())
160-
}
161-
} else {
162-
// Try to find the waypoint service from the cache.
163-
waypointService := s.servicesByResourceName[waypointResourceName]
164-
var addr *workloadapi.NetworkAddress
165-
if waypointService != nil && len(waypointService.GetAddresses()) != 0 {
166-
addr = waypointService.GetAddresses()[0]
167-
updateWaypoint(svc, waypointService.GetAddresses()[0])
168-
}
169-
s.waypointToServices[waypointResourceName] = newWaypointAssociatedServices(addr)
170-
}
171-
s.serviceToWaypoint[resourceName] = waypointResourceName
172-
// Anyway, add svc to the association list.
173-
s.waypointToServices[waypointResourceName].addService(resourceName, svc)
17453
}
17554

17655
func (s *serviceCache) DeleteService(resourceName string) {
17756
s.mutex.Lock()
17857
defer s.mutex.Unlock()
17958
delete(s.servicesByResourceName, resourceName)
180-
181-
// This service has waypoint.
182-
if waypoint, ok := s.serviceToWaypoint[resourceName]; ok {
183-
delete(s.serviceToWaypoint, resourceName)
184-
s.waypointToServices[waypoint].deleteService(resourceName)
185-
}
186-
187-
// This may be a waypoint service.
188-
delete(s.waypointToServices, resourceName)
18959
}
19060

19161
func (s *serviceCache) List() []*workloadapi.Service {
@@ -204,39 +74,3 @@ func (s *serviceCache) GetService(resourceName string) *workloadapi.Service {
20474
defer s.mutex.RUnlock()
20575
return s.servicesByResourceName[resourceName]
20676
}
207-
208-
// RefreshWaypoint is used to process waypoint service.
209-
// If it is a newly added waypoint service, it returns a series of services that need to be updated
210-
// whose hostname type waypoint address should be converted to IP address type. These services were
211-
// processed earlier but the hostname of the related waypoint could not be resolved at that time.
212-
func (s *serviceCache) RefreshWaypoint(svc *workloadapi.Service) []*workloadapi.Service {
213-
if len(svc.GetAddresses()) == 0 {
214-
return nil
215-
}
216-
217-
address := svc.GetAddresses()[0]
218-
resourceName := svc.ResourceName()
219-
220-
s.mutex.Lock()
221-
defer s.mutex.Unlock()
222-
223-
res := []*workloadapi.Service{}
224-
// If this svc is a waypoint service, may need refreshing.
225-
if associated, ok := s.waypointToServices[resourceName]; ok {
226-
waypointAddr := associated.waypointAddress()
227-
if waypointAddr != nil && waypointAddr.String() == address.String() {
228-
return nil
229-
}
230-
231-
log.Debugf("Refreshing services associated with waypoint %s", resourceName)
232-
res = associated.update(address)
233-
}
234-
235-
return res
236-
}
237-
238-
func updateWaypoint(svc *workloadapi.Service, addr *workloadapi.NetworkAddress) {
239-
svc.GetWaypoint().Destination = &workloadapi.GatewayAddress_Address{
240-
Address: addr,
241-
}
242-
}

0 commit comments

Comments
 (0)