7777 AppWrapperVersion = "UNKNOWN"
7878)
7979
80+ const (
81+ workloadAPI = "workloads.kueue.x-k8s.io"
82+ rayclusterAPI = "rayclusters.ray.io"
83+ )
84+
8085func init () {
8186 utilruntime .Must (clientgoscheme .AddToScheme (scheme ))
8287 // Ray
@@ -115,7 +120,7 @@ func main() {
115120 "date" , BuildDate ,
116121 )
117122
118- ctx := ctrl .SetupSignalHandler ()
123+ ctx , cancel := context . WithCancel ( ctrl .SetupSignalHandler () )
119124
120125 cfg := & config.CodeFlareOperatorConfiguration {
121126 ClientConnection : & config.ClientConnection {
@@ -188,7 +193,7 @@ func main() {
188193 }
189194
190195 setupLog .Info ("setting up indexers" )
191- exitOnError (setupIndexers (ctx , mgr , cfg ), "unable to setup indexers" )
196+ exitOnError (setupWorkloadIndexer (ctx , cancel , mgr , cfg ), "unable to setup indexers" )
192197
193198 setupLog .Info ("setting up health endpoints" )
194199 exitOnError (setupProbeEndpoints (mgr , cfg , certsReady ), "unable to set up health check" )
@@ -197,7 +202,7 @@ func main() {
197202 go waitForRayClusterAPIandSetupController (ctx , mgr , cfg , isOpenShift (ctx , kubeClient .DiscoveryClient ), certsReady )
198203
199204 setupLog .Info ("setting up AppWrapper controller" )
200- go setupAppWrapperController ( mgr , cfg , certsReady )
205+ go waitForWorkloadAPIAndSetupAppWrapperController ( ctx , mgr , cfg , certsReady )
201206
202207 setupLog .Info ("starting manager" )
203208 exitOnError (mgr .Start (ctx ), "error running manager" )
@@ -222,75 +227,60 @@ func setupRayClusterController(mgr ctrl.Manager, cfg *config.CodeFlareOperatorCo
222227 return rayClusterController .SetupWithManager (mgr )
223228}
224229
225- // +kubebuilder:rbac:groups="apiextensions.k8s.io",resources=customresourcedefinitions,verbs=get;list;watch
226-
227230func waitForRayClusterAPIandSetupController (ctx context.Context , mgr ctrl.Manager , cfg * config.CodeFlareOperatorConfiguration , isOpenShift bool , certsReady chan struct {}) {
228- crdClient , err := apiextensionsclientset .NewForConfig (mgr .GetConfig ())
229- exitOnError (err , "unable to create CRD client" )
230-
231- crdList , err := crdClient .ApiextensionsV1 ().CustomResourceDefinitions ().List (ctx , metav1.ListOptions {})
232- exitOnError (err , "unable to list CRDs" )
233-
234- if slices .ContainsFunc (crdList .Items , func (crd apiextensionsv1.CustomResourceDefinition ) bool {
235- return crd .Name == "rayclusters.ray.io"
236- }) {
231+ if isAPIAvailable (ctx , mgr , rayclusterAPI ) {
237232 exitOnError (setupRayClusterController (mgr , cfg , isOpenShift , certsReady ), "unable to setup RayCluster controller" )
238- }
239-
240- retryWatcher , err := retrywatch .NewRetryWatcher (crdList .ResourceVersion , & cache.ListWatch {
241- ListFunc : func (options metav1.ListOptions ) (runtime.Object , error ) {
242- return crdClient .ApiextensionsV1 ().CustomResourceDefinitions ().List (ctx , metav1.ListOptions {})
243- },
244- WatchFunc : func (options metav1.ListOptions ) (watch.Interface , error ) {
245- return crdClient .ApiextensionsV1 ().CustomResourceDefinitions ().Watch (ctx , metav1.ListOptions {})
246- },
247- })
248- exitOnError (err , "unable to create retry watcher" )
249-
250- defer retryWatcher .Stop ()
251- for {
252- select {
253- case <- ctx .Done ():
254- return
255- case event := <- retryWatcher .ResultChan ():
256- switch event .Type {
257- case watch .Error :
258- exitOnError (apierrors .FromObject (event .Object ), "error watching for RayCluster API" )
259-
260- case watch .Added , watch .Modified :
261- if crd := event .Object .(* apiextensionsv1.CustomResourceDefinition ); crd .Name == "rayclusters.ray.io" &&
262- slices .ContainsFunc (crd .Status .Conditions , func (condition apiextensionsv1.CustomResourceDefinitionCondition ) bool {
263- return condition .Type == apiextensionsv1 .Established && condition .Status == apiextensionsv1 .ConditionTrue
264- }) {
265- setupLog .Info ("RayCluster API installed, setting up controller" )
266- exitOnError (setupRayClusterController (mgr , cfg , isOpenShift , certsReady ), "unable to setup RayCluster controller" )
267- return
268- }
269- }
270- }
233+ } else {
234+ waitForAPI (ctx , mgr , rayclusterAPI , func () {
235+ exitOnError (setupRayClusterController (mgr , cfg , isOpenShift , certsReady ), "unable to setup RayCluster controller" )
236+ })
271237 }
272238}
273239
274- func setupAppWrapperController (mgr ctrl.Manager , cfg * config.CodeFlareOperatorConfiguration , certsReady chan struct {}) {
240+ func setupAppWrapperController (mgr ctrl.Manager , cfg * config.CodeFlareOperatorConfiguration , certsReady chan struct {}) error {
275241 setupLog .Info ("Waiting for certificate generation to complete" )
276242 <- certsReady
277243 setupLog .Info ("Certs ready" )
278244
279- if cfg .AppWrapper != nil && ptr .Deref (cfg .AppWrapper .Enabled , false ) {
280- exitOnError (awctrl .SetupWebhooks (mgr , cfg .AppWrapper .Config ), "error setting up AppWrapper webhook" )
281- exitOnError (awctrl .SetupControllers (mgr , cfg .AppWrapper .Config ), "error setting up AppWrapper controller" )
245+ setupLog .Info ("Setting up AppWrapper webhook and controller" )
246+ if err := awctrl .SetupWebhooks (mgr , cfg .AppWrapper .Config ); err != nil {
247+ return err
248+ }
249+ return awctrl .SetupControllers (mgr , cfg .AppWrapper .Config )
250+ }
251+
252+ func waitForWorkloadAPIAndSetupAppWrapperController (ctx context.Context , mgr ctrl.Manager , cfg * config.CodeFlareOperatorConfiguration , certsReady chan struct {}) {
253+ if cfg .AppWrapper == nil || ! ptr .Deref (cfg .AppWrapper .Enabled , false ) {
254+ setupLog .Info ("AppWrapper controller disabled by config" )
255+ }
256+
257+ if isAPIAvailable (ctx , mgr , workloadAPI ) {
258+ exitOnError (setupAppWrapperController (mgr , cfg , certsReady ), "unable to setup AppWrapper controller" )
282259 } else {
283- setupLog .Info ("AppWrapper controller disabled" , "Config flag value" , * cfg .AppWrapper .Enabled )
260+ waitForAPI (ctx , mgr , workloadAPI , func () {
261+ exitOnError (setupAppWrapperController (mgr , cfg , certsReady ), "unable to setup AppWrapper controller" )
262+ })
284263 }
285264}
286265
287- func setupIndexers (ctx context.Context , mgr ctrl.Manager , cfg * config.CodeFlareOperatorConfiguration ) error {
288- if cfg .AppWrapper != nil && ptr .Deref (cfg .AppWrapper .Enabled , false ) {
289- if err := awctrl .SetupIndexers (ctx , mgr , cfg .AppWrapper .Config ); err != nil {
290- return fmt .Errorf ("workload indexer: %w" , err )
291- }
266+ func setupWorkloadIndexer (ctx context.Context , cancel context.CancelFunc , mgr ctrl.Manager , cfg * config.CodeFlareOperatorConfiguration ) error {
267+ if cfg .AppWrapper == nil || ! ptr .Deref (cfg .AppWrapper .Enabled , false ) {
268+ setupLog .Info ("Workload indexer disabled by config" )
269+ return nil
270+ }
271+
272+ if isAPIAvailable (ctx , mgr , workloadAPI ) {
273+ return awctrl .SetupIndexers (ctx , mgr , cfg .AppWrapper .Config )
274+ } else {
275+ // If AppWrappers are enabled and the Workload API becomes available later, initiate an orderly
276+ // restart of the codeflare operator to enable the workload indexer to be setup in the the new instance of the operator.
277+ // It is not possible to add an indexer once the mgr has started so, a restart if the only avenue.
278+ go waitForAPI (ctx , mgr , workloadAPI , func () {
279+ setupLog .Info ("Workload API now available; triggering controller restart" )
280+ cancel ()
281+ })
282+ return nil
292283 }
293- return nil
294284}
295285
296286// +kubebuilder:rbac:groups="",resources=secrets,verbs=get;list;watch;update
@@ -437,3 +427,68 @@ func getClusterDomain(ctx context.Context, configClient *clientset.Clientset) (s
437427
438428 return domain , nil
439429}
430+
431+ // +kubebuilder:rbac:groups="apiextensions.k8s.io",resources=customresourcedefinitions,verbs=get;list;watch
432+
433+ func isAPIAvailable (ctx context.Context , mgr ctrl.Manager , apiName string ) bool {
434+ crdClient , err := apiextensionsclientset .NewForConfig (mgr .GetConfig ())
435+ exitOnError (err , "unable to create CRD client" )
436+
437+ crdList , err := crdClient .ApiextensionsV1 ().CustomResourceDefinitions ().List (ctx , metav1.ListOptions {})
438+ exitOnError (err , "unable to list CRDs" )
439+
440+ return slices .ContainsFunc (crdList .Items , func (crd apiextensionsv1.CustomResourceDefinition ) bool {
441+ return crd .Name == apiName
442+ })
443+ }
444+
445+ func waitForAPI (ctx context.Context , mgr ctrl.Manager , apiName string , action func ()) {
446+ crdClient , err := apiextensionsclientset .NewForConfig (mgr .GetConfig ())
447+ exitOnError (err , "unable to create CRD client" )
448+
449+ crdList , err := crdClient .ApiextensionsV1 ().CustomResourceDefinitions ().List (ctx , metav1.ListOptions {})
450+ exitOnError (err , "unable to list CRDs" )
451+
452+ // If API is already available, just invoke action
453+ if slices .ContainsFunc (crdList .Items , func (crd apiextensionsv1.CustomResourceDefinition ) bool {
454+ return crd .Name == apiName
455+ }) {
456+ action ()
457+ return
458+ }
459+
460+ // Wait for the API to become available then invoke action
461+ setupLog .Info (fmt .Sprintf ("API %v not available, setting up retry watcher" , apiName ))
462+ retryWatcher , err := retrywatch .NewRetryWatcher (crdList .ResourceVersion , & cache.ListWatch {
463+ ListFunc : func (options metav1.ListOptions ) (runtime.Object , error ) {
464+ return crdClient .ApiextensionsV1 ().CustomResourceDefinitions ().List (ctx , metav1.ListOptions {})
465+ },
466+ WatchFunc : func (options metav1.ListOptions ) (watch.Interface , error ) {
467+ return crdClient .ApiextensionsV1 ().CustomResourceDefinitions ().Watch (ctx , metav1.ListOptions {})
468+ },
469+ })
470+ exitOnError (err , "unable to create retry watcher" )
471+
472+ defer retryWatcher .Stop ()
473+ for {
474+ select {
475+ case <- ctx .Done ():
476+ return
477+ case event := <- retryWatcher .ResultChan ():
478+ switch event .Type {
479+ case watch .Error :
480+ exitOnError (apierrors .FromObject (event .Object ), fmt .Sprintf ("error watching for API %v" , apiName ))
481+
482+ case watch .Added , watch .Modified :
483+ if crd := event .Object .(* apiextensionsv1.CustomResourceDefinition ); crd .Name == apiName &&
484+ slices .ContainsFunc (crd .Status .Conditions , func (condition apiextensionsv1.CustomResourceDefinitionCondition ) bool {
485+ return condition .Type == apiextensionsv1 .Established && condition .Status == apiextensionsv1 .ConditionTrue
486+ }) {
487+ setupLog .Info (fmt .Sprintf ("API %v installed, invoking deferred action" , apiName ))
488+ action ()
489+ return
490+ }
491+ }
492+ }
493+ }
494+ }
0 commit comments