diff --git a/pkg/coscheduling/core/core.go b/pkg/coscheduling/core/core.go index cd7ad6fb76..1a713a1a31 100644 --- a/pkg/coscheduling/core/core.go +++ b/pkg/coscheduling/core/core.go @@ -26,11 +26,9 @@ import ( corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" informerv1 "k8s.io/client-go/informers/core/v1" - listerv1 "k8s.io/client-go/listers/core/v1" "k8s.io/client-go/tools/cache" "k8s.io/klog/v2" "k8s.io/kubernetes/pkg/scheduler/framework" @@ -88,8 +86,8 @@ type PodGroupManager struct { permittedPG *gocache.Cache // backedOffPG stores the podgorup name which failed scheudling recently. backedOffPG *gocache.Cache - // podLister is pod lister - podLister listerv1.PodLister + // podInformer is pod informer + podInformer cache.SharedIndexInformer // assignedPodsByPG stores the pods assumed or bound for podgroups assignedPodsByPG map[string]sets.Set[string] sync.RWMutex @@ -124,7 +122,7 @@ func NewPodGroupManager(client client.Client, snapshotSharedLister framework.Sha client: client, snapshotSharedLister: snapshotSharedLister, scheduleTimeout: scheduleTimeout, - podLister: podInformer.Lister(), + podInformer: podInformer.Informer(), permittedPG: gocache.New(3*time.Second, 3*time.Second), backedOffPG: gocache.New(10*time.Second, 10*time.Second), assignedPodsByPG: map[string]sets.Set[string]{}, @@ -175,8 +173,8 @@ func (pgMgr *PodGroupManager) BackoffPodGroup(pgName string, backoff time.Durati // in the given state, with a reserved key "kubernetes.io/pods-to-activate". func (pgMgr *PodGroupManager) ActivateSiblings(ctx context.Context, pod *corev1.Pod, state *framework.CycleState) { lh := klog.FromContext(ctx) - pgName := util.GetPodGroupLabel(pod) - if pgName == "" { + pgFullName := util.GetPodGroupFullName(pod) + if pgFullName == "" { return } @@ -186,29 +184,27 @@ func (pgMgr *PodGroupManager) ActivateSiblings(ctx context.Context, pod *corev1. } else if s, ok := c.(*PermitState); !ok || !s.Activate { return } - - pods, err := pgMgr.podLister.Pods(pod.Namespace).List( - labels.SelectorFromSet(labels.Set{v1alpha1.PodGroupLabel: pgName}), - ) + indexer := pgMgr.podInformer.GetIndexer() + groupPods, err := podsBelongToGroup(indexer, pgFullName) if err != nil { - lh.Error(err, "Failed to obtain pods belong to a PodGroup", "podGroup", pgName) + lh.Error(err, "Failed to obtain pods belong to a PodGroup", "podGroup", pgFullName) return } - - for i := range pods { - if pods[i].UID == pod.UID { - pods = append(pods[:i], pods[i+1:]...) - break + siblings := make([]*corev1.Pod, 0, len(groupPods)) + for i := range groupPods { + tmpPod := groupPods[i] + if tmpPod.UID != pod.UID { + siblings = append(siblings, tmpPod) } } - if len(pods) != 0 { + if len(siblings) != 0 { if c, err := state.Read(framework.PodsToActivateKey); err == nil { if s, ok := c.(*framework.PodsToActivate); ok { s.Lock() - for _, pod := range pods { - namespacedName := GetNamespacedName(pod) - s.Map[namespacedName] = pod + for _, sibling := range siblings { + namespacedName := GetNamespacedName(sibling) + s.Map[namespacedName] = sibling } s.Unlock() } @@ -231,17 +227,15 @@ func (pgMgr *PodGroupManager) PreFilter(ctx context.Context, pod *corev1.Pod) er if _, exist := pgMgr.backedOffPG.Get(pgFullName); exist { return fmt.Errorf("podGroup %v failed recently", pgFullName) } - - pods, err := pgMgr.podLister.Pods(pod.Namespace).List( - labels.SelectorFromSet(labels.Set{v1alpha1.PodGroupLabel: util.GetPodGroupLabel(pod)}), - ) + indexer := pgMgr.podInformer.GetIndexer() + siblings, err := podsBelongToGroup(indexer, pgFullName) if err != nil { return fmt.Errorf("podLister list pods failed: %w", err) } - if len(pods) < int(pg.Spec.MinMember) { + if len(siblings) < int(pg.Spec.MinMember) { return fmt.Errorf("pre-filter pod %v cannot find enough sibling pods, "+ - "current pods number: %v, minMember of group: %v", pod.Name, len(pods), pg.Spec.MinMember) + "current pods number: %v, minMember of group: %v", pod.Name, len(siblings), pg.Spec.MinMember) } if pg.Spec.MinResources == nil { @@ -272,6 +266,45 @@ func (pgMgr *PodGroupManager) PreFilter(ctx context.Context, pod *corev1.Pod) er return nil } +// podsBelongToGroup returns pods indexed by the PodGroup's full name. It falls back to a +// linear scan when the expected label index is not registered on the informer. +func podsBelongToGroup(indexer cache.Indexer, pgFullName string) ([]*corev1.Pod, error) { + if _, hasIndex := indexer.GetIndexers()[util.LabelIndexerName]; hasIndex { + podsObj, err := indexer.ByIndex(util.LabelIndexerName, pgFullName) + if err != nil { + return nil, err + } + return castPods(podsObj), nil + } + + return filterPods(indexer.List(), pgFullName), nil +} + +func castPods(objs []interface{}) []*corev1.Pod { + pods := make([]*corev1.Pod, 0, len(objs)) + for i := range objs { + pod, ok := objs[i].(*corev1.Pod) + if ok { + pods = append(pods, pod) + } + } + return pods +} + +func filterPods(objs []interface{}, pgFullName string) []*corev1.Pod { + pods := make([]*corev1.Pod, 0, len(objs)) + for i := range objs { + pod, ok := objs[i].(*corev1.Pod) + if !ok { + continue + } + if util.GetPodGroupFullName(pod) == pgFullName { + pods = append(pods, pod) + } + } + return pods +} + // Permit permits a pod to run, if the minMember match, it would send a signal to chan. func (pgMgr *PodGroupManager) Permit(ctx context.Context, state *framework.CycleState, pod *corev1.Pod) Status { pgFullName, pg := pgMgr.GetPodGroup(ctx, pod) diff --git a/pkg/coscheduling/core/core_test.go b/pkg/coscheduling/core/core_test.go index 0968492d87..e3b1ccbcbf 100644 --- a/pkg/coscheduling/core/core_test.go +++ b/pkg/coscheduling/core/core_test.go @@ -167,7 +167,7 @@ func TestPreFilter(t *testing.T) { pgMgr := &PodGroupManager{ client: client, snapshotSharedLister: tu.NewFakeSharedLister(tt.pendingPods, nodes), - podLister: podInformer.Lister(), + podInformer: podInformer.Informer(), scheduleTimeout: &scheduleTimeout, permittedPG: newCache(), backedOffPG: newCache(), diff --git a/pkg/coscheduling/coscheduling.go b/pkg/coscheduling/coscheduling.go index 1543bafb39..f824162e3a 100644 --- a/pkg/coscheduling/coscheduling.go +++ b/pkg/coscheduling/coscheduling.go @@ -85,7 +85,10 @@ func New(ctx context.Context, obj runtime.Object, handle framework.Handle) (fram } // Performance improvement when retrieving list of objects by namespace or we'll log 'index not exist' warning. - handle.SharedInformerFactory().Core().V1().Pods().Informer().AddIndexers(cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}) + handle.SharedInformerFactory().Core().V1().Pods().Informer().AddIndexers(cache.Indexers{ + cache.NamespaceIndex: cache.MetaNamespaceIndexFunc, + util.LabelIndexerName: util.NewIndexByLabelAndNamespace(v1alpha1.PodGroupLabel), + }) scheduleTimeDuration := time.Duration(args.PermitWaitingTimeSeconds) * time.Second pgMgr := core.NewPodGroupManager( diff --git a/pkg/util/client_util.go b/pkg/util/client_util.go index 6da2b209d6..29d5093422 100644 --- a/pkg/util/client_util.go +++ b/pkg/util/client_util.go @@ -3,6 +3,7 @@ package util import ( "context" "fmt" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/rest" @@ -10,6 +11,10 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" ) +const ( + LabelIndexerName = "label-indexer" +) + // NewClientWithCachedReader returns a controller runtime Client with cache-baked client. func NewClientWithCachedReader(ctx context.Context, config *rest.Config, scheme *runtime.Scheme) (client.Client, cache.Cache, error) { ccache, err := cache.New(config, cache.Options{ @@ -30,3 +35,19 @@ func NewClientWithCachedReader(ctx context.Context, config *rest.Config, scheme }) return c, ccache, err } + +// NewIndexByLabelAndNamespace returns an indexer function for indexing pods by label and namespace. +func NewIndexByLabelAndNamespace(label string) func(obj interface{}) ([]string, error) { + return func(obj interface{}) ([]string, error) { + labels := obj.(metav1.Object).GetLabels() + if labels == nil { + return nil, nil + } + if labels[label] == "" { + return nil, nil + } + namespace := obj.(metav1.Object).GetNamespace() + + return []string{namespace + "/" + labels[label]}, nil + } +}