Skip to content

Commit bf3bf9e

Browse files
committed
improve schedulingQueue to avoid making zombie scheduler
1 parent 1a94fc0 commit bf3bf9e

File tree

4 files changed

+139
-23
lines changed

4 files changed

+139
-23
lines changed

cloud/scheduler/queue/queue.go

Lines changed: 35 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,9 @@ import (
88
)
99

1010
type SchedulingQueue struct {
11-
activeQ []*qemuSpec
12-
lock *sync.Cond
11+
activeQ []*qemuSpec
12+
lock *sync.Cond
13+
shuttingDown bool
1314
}
1415

1516
func New() *SchedulingQueue {
@@ -30,22 +31,47 @@ type qemuSpec struct {
3031
func (s *SchedulingQueue) Add(ctx context.Context, config *api.VirtualMachineCreateOptions) {
3132
s.lock.L.Lock()
3233
defer s.lock.L.Unlock()
34+
35+
if s.shuttingDown {
36+
return
37+
}
38+
3339
s.activeQ = append(s.activeQ, &qemuSpec{ctx: ctx, config: config})
3440
s.lock.Signal()
3541
}
3642

37-
// return next qemuSpec
38-
// to do : break if context is done
39-
func (s *SchedulingQueue) NextQEMU(ctx context.Context) *qemuSpec {
40-
// wait
43+
// return length of active queue
44+
// func (s *SchedulingQueue) Len() int {
45+
// s.lock.L.Lock()
46+
// defer s.lock.L.Unlock()
47+
// return len(s.activeQ)
48+
// }
49+
50+
// return nex qemuSpec
51+
func (s *SchedulingQueue) Get() (spec *qemuSpec, shutdown bool) {
4152
s.lock.L.Lock()
4253
defer s.lock.L.Unlock()
43-
for len(s.activeQ) == 0 {
54+
for len(s.activeQ) == 0 && !s.shuttingDown {
4455
s.lock.Wait()
4556
}
46-
spec := s.activeQ[0]
57+
if len(s.activeQ) == 0 {
58+
return nil, true
59+
}
60+
61+
spec = s.activeQ[0]
62+
// The underlying array still exists and reference this object,
63+
// so the object will not be garbage collected.
64+
s.activeQ[0] = nil
4765
s.activeQ = s.activeQ[1:]
48-
return spec
66+
return spec, false
67+
}
68+
69+
// shut down the queue
70+
func (s *SchedulingQueue) ShutDown() {
71+
s.lock.L.Lock()
72+
defer s.lock.L.Unlock()
73+
s.shuttingDown = true
74+
s.lock.Broadcast()
4975
}
5076

5177
func (s *qemuSpec) Config() *api.VirtualMachineCreateOptions {
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
package queue_test
2+
3+
import (
4+
"context"
5+
"testing"
6+
"time"
7+
8+
. "github.com/onsi/ginkgo/v2"
9+
. "github.com/onsi/gomega"
10+
"github.com/sp-yduck/cluster-api-provider-proxmox/cloud/scheduler/queue"
11+
"github.com/sp-yduck/proxmox-go/api"
12+
)
13+
14+
func TestQueue(t *testing.T) {
15+
RegisterFailHandler(Fail)
16+
RunSpecs(t, "Queue Suite")
17+
}
18+
19+
var _ = Describe("New", Label("unit", "queue"), func() {
20+
It("shoud not error", func() {
21+
q := queue.New()
22+
Expect(q).ToNot(BeNil())
23+
})
24+
})
25+
26+
var _ = Describe("Add", Label("unit", "queue"), func() {
27+
q := queue.New()
28+
29+
It("should not error", func() {
30+
q.Add(context.Background(), &api.VirtualMachineCreateOptions{Name: "foo"})
31+
})
32+
})
33+
34+
var _ = Describe("Get", Label("unit", "queue"), func() {
35+
var q *queue.SchedulingQueue
36+
37+
BeforeEach(func() {
38+
q = queue.New()
39+
})
40+
41+
Context("normal", func() {
42+
It("should run properly", func() {
43+
c := &api.VirtualMachineCreateOptions{Name: "foo"}
44+
q.Add(context.Background(), c)
45+
qemu, shutdown := q.Get()
46+
Expect(qemu.Config()).To(Equal(c))
47+
Expect(shutdown).To(BeFalse())
48+
})
49+
})
50+
51+
Context("shutdown empty queue after 1 sec", func() {
52+
It("should get nil", func() {
53+
go func() {
54+
time.Sleep(1 * time.Second)
55+
q.ShutDown()
56+
}()
57+
qemu, shutdown := q.Get()
58+
Expect(qemu).To(BeNil())
59+
Expect(shutdown).To(BeTrue())
60+
})
61+
})
62+
})

cloud/scheduler/scheduler.go

Lines changed: 22 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ func (m *Manager) GetOrCreateScheduler(client *proxmox.Service) *Scheduler {
4747
if err != nil {
4848
// create new scheduler without registering
4949
// to not make it zombie scheduler set timeout to context
50-
sched := m.NewScheduler(client, WithCancelContext(context.WithTimeout(m.ctx, 5*time.Minute)))
50+
sched := m.NewScheduler(client, WithTimeout(1*time.Minute))
5151
return sched
5252
}
5353
m.params.Logger = m.params.Logger.WithValues("scheduler ID", *schedID)
@@ -90,11 +90,17 @@ func (m *Manager) NewScheduler(client *proxmox.Service, opts ...SchedulerOption)
9090
}
9191

9292
type SchedulerOption func(s *Scheduler)
93+
type CancelFunc func()
9394

94-
func WithCancelContext(ctx context.Context, cancel context.CancelFunc) SchedulerOption {
95+
func (s *Scheduler) WithCancel() (*Scheduler, CancelFunc) {
96+
return s, s.Stop
97+
}
98+
99+
// set timeout to scheduler
100+
func WithTimeout(timeout time.Duration) SchedulerOption {
95101
return func(s *Scheduler) {
96-
s.ctx = ctx
97-
s.cancel = cancel
102+
s, cancel := s.WithCancel()
103+
go time.AfterFunc(timeout, cancel)
98104
}
99105
}
100106

@@ -123,7 +129,7 @@ type Scheduler struct {
123129

124130
// to do : cache
125131

126-
// map[qemu name]*framework.CycleState
132+
// map[qemu name]chan *framework.CycleState
127133
resultMap map[string]chan *framework.CycleState
128134
logger logr.Logger
129135

@@ -170,14 +176,18 @@ func (s *Scheduler) RunAsync() {
170176
}
171177

172178
// stop scheduler
173-
// func (s *Scheduler) Stop() {
174-
// defer s.cancel()
175-
// }
179+
func (s *Scheduler) Stop() {
180+
defer s.cancel()
181+
s.schedulingQueue.ShutDown()
182+
}
176183

177184
// retrieve one qemuSpec from queue and try to create
178185
// new qemu according to the qemuSpec
179186
func (s *Scheduler) ScheduleOne(ctx context.Context) {
180-
qemu := s.schedulingQueue.NextQEMU(ctx)
187+
qemu, shutdown := s.schedulingQueue.Get()
188+
if shutdown {
189+
return
190+
}
181191
config := qemu.Config()
182192
qemuCtx := qemu.Context()
183193
s.logger = s.logger.WithValues("qemu", config.Name)
@@ -230,7 +240,9 @@ func (s *Scheduler) WaitStatus(ctx context.Context, config *api.VirtualMachineCr
230240
delete(s.resultMap, config.Name)
231241
return *state, nil
232242
case <-ctx.Done():
233-
return framework.CycleState{}, fmt.Errorf("exceed timeout deadline")
243+
err := fmt.Errorf("exceed timeout deadline. schedulingQueue might be shutdowned")
244+
s.logger.Error(err, fmt.Sprintf("schedulingQueue: %v", *s.schedulingQueue))
245+
return framework.CycleState{}, err
234246
}
235247
}
236248

cloud/scheduler/scheduler_test.go

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,11 @@ import (
66

77
. "github.com/onsi/ginkgo/v2"
88
. "github.com/onsi/gomega"
9+
"github.com/sp-yduck/proxmox-go/api"
910
"sigs.k8s.io/controller-runtime/pkg/log/zap"
1011

1112
"github.com/sp-yduck/cluster-api-provider-proxmox/cloud/scheduler"
1213
"github.com/sp-yduck/cluster-api-provider-proxmox/cloud/scheduler/framework"
13-
"github.com/sp-yduck/proxmox-go/api"
14-
// "github.com/sp-yduck/cluster-api-provider-proxmox/cloud/scheduler/framework"
1514
)
1615

1716
var _ = Describe("NewManager", Label("unit", "scheduler"), func() {
@@ -40,19 +39,36 @@ var _ = Describe("GetOrCreateScheduler", Label("integration", "scheduler"), func
4039
})
4140
})
4241

43-
var _ = Describe("Run (RunAsync) / IsRunning", Label("unit", "scheduler"), func() {
42+
var _ = Describe("Run (RunAsync) / IsRunning / Stop", Label("unit", "scheduler"), func() {
4443
manager := scheduler.NewManager(scheduler.SchedulerParams{zap.New(zap.WriteTo(GinkgoWriter), zap.UseDevMode(true))})
4544

4645
Context("with minimal scheduler", func() {
4746
It("should not error", func() {
4847
sched := manager.NewScheduler(proxmoxSvc)
4948
sched.RunAsync()
5049
time.Sleep(1 * time.Second)
51-
Expect(sched.IsRunning()).To(Equal(true))
50+
Expect(sched.IsRunning()).To(BeTrue())
51+
sched.Stop()
52+
time.Sleep(1 * time.Second)
53+
Expect(sched.IsRunning()).To(BeFalse())
5254
})
5355
})
5456
})
5557

58+
var _ = Describe("WithTimeout", Label("integration", "scheduler"), func() {
59+
manager := scheduler.NewManager(scheduler.SchedulerParams{})
60+
61+
It("should not error", func() {
62+
sched := manager.NewScheduler(proxmoxSvc, scheduler.WithTimeout(2*time.Second))
63+
Expect(sched).NotTo(BeNil())
64+
sched.RunAsync()
65+
time.Sleep(1 * time.Second)
66+
Expect(sched.IsRunning()).To(BeTrue())
67+
time.Sleep(2 * time.Second)
68+
Expect(sched.IsRunning()).To(BeFalse())
69+
})
70+
})
71+
5672
var _ = Describe("CreateQEMU", Label("integration"), func() {
5773
manager := scheduler.NewManager(scheduler.SchedulerParams{zap.New(zap.WriteTo(GinkgoWriter), zap.UseDevMode(true))})
5874
var result framework.SchedulerResult

0 commit comments

Comments
 (0)