Skip to content

Commit de7dbcd

Browse files
zachsmith1sttts
authored andcommitted
Prefactoring: controller-sharding
Signed-off-by: Dr. Stefan Schimanski <stefan.schimanski@gmail.com>
1 parent 7527cc0 commit de7dbcd

File tree

3 files changed

+263
-36
lines changed

3 files changed

+263
-36
lines changed

pkg/controller/controller.go

Lines changed: 30 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ func NewTypedUnmanaged[request mcreconcile.ClusterAware[request]](name string, m
9191
}
9292
return &mcController[request]{
9393
TypedController: c,
94-
clusters: make(map[string]engagedCluster),
94+
clusters: make(map[string]*engagedCluster),
9595
}, nil
9696
}
9797

@@ -101,28 +101,39 @@ type mcController[request mcreconcile.ClusterAware[request]] struct {
101101
controller.TypedController[request]
102102

103103
lock sync.Mutex
104-
clusters map[string]engagedCluster
104+
clusters map[string]*engagedCluster
105105
sources []mcsource.TypedSource[client.Object, request]
106106
}
107107

108108
type engagedCluster struct {
109109
name string
110110
cluster cluster.Cluster
111+
ctx context.Context
112+
cancel context.CancelFunc
111113
}
112114

113115
func (c *mcController[request]) Engage(ctx context.Context, name string, cl cluster.Cluster) error {
114116
c.lock.Lock()
115117
defer c.lock.Unlock()
116118

117-
if old, ok := c.clusters[name]; ok && old.cluster == cl {
118-
return nil
119+
// Check if we already have this cluster engaged with the SAME context
120+
if old, ok := c.clusters[name]; ok {
121+
if old.cluster == cl && old.ctx.Err() == nil {
122+
// Same impl, engagement still live → nothing to do
123+
return nil
124+
}
125+
// Re-engage: either old ctx is done, or impl changed. Stop the old one if still live.
126+
if old.ctx.Err() == nil {
127+
old.cancel()
128+
}
129+
delete(c.clusters, name)
119130
}
120131

121-
ctx, cancel := context.WithCancel(ctx) //nolint:govet // cancel is called in the error case only.
132+
engCtx, cancel := context.WithCancel(ctx)
122133

123134
// pass through in case the controller itself is cluster aware
124135
if ctrl, ok := c.TypedController.(multicluster.Aware); ok {
125-
if err := ctrl.Engage(ctx, name, cl); err != nil {
136+
if err := ctrl.Engage(engCtx, name, cl); err != nil {
126137
cancel()
127138
return err
128139
}
@@ -135,49 +146,49 @@ func (c *mcController[request]) Engage(ctx context.Context, name string, cl clus
135146
cancel()
136147
return fmt.Errorf("failed to engage for cluster %q: %w", name, err)
137148
}
138-
if err := c.TypedController.Watch(startWithinContext[request](ctx, src)); err != nil {
149+
if err := c.TypedController.Watch(startWithinContext[request](engCtx, src)); err != nil {
139150
cancel()
140151
return fmt.Errorf("failed to watch for cluster %q: %w", name, err)
141152
}
142153
}
143154

144-
ec := engagedCluster{
155+
ec := &engagedCluster{
145156
name: name,
146157
cluster: cl,
158+
ctx: engCtx,
159+
cancel: cancel,
147160
}
148161
c.clusters[name] = ec
149-
go func() {
162+
go func(ctx context.Context, key string, token *engagedCluster) {
163+
<-ctx.Done()
150164
c.lock.Lock()
151165
defer c.lock.Unlock()
152-
if c.clusters[name] == ec {
153-
delete(c.clusters, name)
166+
if cur, ok := c.clusters[key]; ok && cur == token {
167+
delete(c.clusters, key)
154168
}
155-
}()
169+
// note: cancel() is driven by parent; no need to call here
170+
}(engCtx, name, ec)
156171

157-
return nil //nolint:govet // cancel is called in the error case only.
172+
return nil
158173
}
159174

160175
func (c *mcController[request]) MultiClusterWatch(src mcsource.TypedSource[client.Object, request]) error {
161176
c.lock.Lock()
162177
defer c.lock.Unlock()
163178

164-
ctx, cancel := context.WithCancel(context.Background()) //nolint:govet // cancel is called in the error case only.
165-
166179
for name, eng := range c.clusters {
167180
src, err := src.ForCluster(name, eng.cluster)
168181
if err != nil {
169-
cancel()
170182
return fmt.Errorf("failed to engage for cluster %q: %w", name, err)
171183
}
172-
if err := c.TypedController.Watch(startWithinContext[request](ctx, src)); err != nil {
173-
cancel()
184+
if err := c.TypedController.Watch(startWithinContext[request](eng.ctx, src)); err != nil {
174185
return fmt.Errorf("failed to watch for cluster %q: %w", name, err)
175186
}
176187
}
177188

178189
c.sources = append(c.sources, src)
179190

180-
return nil //nolint:govet // cancel is called in the error case only.
191+
return nil
181192
}
182193

183194
func startWithinContext[request mcreconcile.ClusterAware[request]](ctx context.Context, src source.TypedSource[request]) source.TypedSource[request] {

pkg/manager/manager.go

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,9 @@ type Runnable interface {
124124

125125
var _ Manager = &mcManager{}
126126

127+
// Option mutates mcManager configuration.
128+
type Option func(*mcManager)
129+
127130
type mcManager struct {
128131
manager.Manager
129132
provider multicluster.Provider
@@ -134,20 +137,24 @@ type mcManager struct {
134137
// New returns a new Manager for creating Controllers. The provider is used to
135138
// discover and manage clusters. With a provider set to nil, the manager will
136139
// behave like a regular controller-runtime manager.
137-
func New(config *rest.Config, provider multicluster.Provider, opts Options) (Manager, error) {
140+
func New(config *rest.Config, provider multicluster.Provider, opts manager.Options, mcOpts ...Option) (Manager, error) {
138141
mgr, err := manager.New(config, opts)
139142
if err != nil {
140143
return nil, err
141144
}
142-
return WithMultiCluster(mgr, provider)
145+
return WithMultiCluster(mgr, provider, mcOpts...)
143146
}
144147

145148
// WithMultiCluster wraps a host manager to run multi-cluster controllers.
146-
func WithMultiCluster(mgr manager.Manager, provider multicluster.Provider) (Manager, error) {
147-
return &mcManager{
148-
Manager: mgr,
149-
provider: provider,
150-
}, nil
149+
func WithMultiCluster(mgr manager.Manager, provider multicluster.Provider, mcOpts ...Option) (Manager, error) {
150+
m := &mcManager{Manager: mgr, provider: provider}
151+
152+
// Apply options before wiring the Runnable so overrides take effect early.
153+
for _, o := range mcOpts {
154+
o(m)
155+
}
156+
157+
return m, nil
151158
}
152159

153160
// GetCluster returns a cluster for the given identifying cluster name. Get

0 commit comments

Comments
 (0)