Skip to content

Commit 57697a8

Browse files
avagingvisor-bot
authored andcommitted
kernel: periodically preempt tasks when platform max concurrency is exceeded
Add a new method `ConcurrencyCount()` to the `platform.Platform` interface. This method returns the maximum number of tasks that can run in parallel on the platform. For most platforms, this is effectively infinite (math.MaxInt). For the KVM platform, this is the number of available vCPUs. Introduce a new method `Preempt()` to the `platform.Context` interface. The CPU clock ticker now checks if the number of running tasks is greater than Platform.ConcurrencyCount(). If it is, it calls `t.p.Preempt()` on each running task to trigger a preemption. This mechanism ensures that on platforms with a fixed number of execution units, like KVM with a limited number of vCPUs, tasks are periodically preempted to allow other runnable tasks to be scheduled. This prevents any single task from monopolizing a vCPU and improves overall fairness and responsiveness. The `Preempt()` implementation in most platforms is a no-op. In the KVM platform, `Preempt()` is implemented by leveraging the existing interrupt mechanism, effectively signaling the vCPU to exit. PiperOrigin-RevId: 805926296
1 parent 973cf76 commit 57697a8

File tree

7 files changed

+61
-1
lines changed

7 files changed

+61
-1
lines changed

pkg/sentry/kernel/task_sched.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -197,6 +197,7 @@ func (k *Kernel) runCPUClockTicker() {
197197
allTasks []*Task
198198
incTasks = make([]*Task, k.applicationCores)
199199
)
200+
concurrencyCount := k.ConcurrencyCount()
200201

201202
for {
202203
// Stop CPU clocks while nothing is running.
@@ -244,9 +245,12 @@ func (k *Kernel) runCPUClockTicker() {
244245
// applicationCores running tasks (and their thread groups).
245246
allTasks = k.tasks.Root.TasksAppend(allTasks)
246247
runningTasks := 0
248+
runningAppTasks := 0
247249
for _, t := range allTasks {
248250
state := t.TaskGoroutineState()
249-
if state != TaskGoroutineRunningApp && state != TaskGoroutineRunningSys {
251+
if state == TaskGoroutineRunningApp {
252+
runningAppTasks++
253+
} else if state != TaskGoroutineRunningSys {
250254
continue
251255
}
252256
if runningTasks < len(incTasks) {
@@ -259,6 +263,7 @@ func (k *Kernel) runCPUClockTicker() {
259263
incTasks[i] = t
260264
}
261265
}
266+
preempt := runningAppTasks > concurrencyCount
262267
numIncTasks := min(runningTasks, len(incTasks))
263268
// Shuffle incTasks to ensure that if multiple tasks are in the same
264269
// thread group, then all are equally likely to be
@@ -272,6 +277,9 @@ func (k *Kernel) runCPUClockTicker() {
272277
t.appCPUClock.Add(linux.ClockTick)
273278
t.tg.appCPUClockLast.Store(t)
274279
t.tg.appCPUClock.Add(linux.ClockTick)
280+
if preempt {
281+
t.p.Preempt()
282+
}
275283
fallthrough
276284
case TaskGoroutineRunningSys:
277285
t.appSysCPUClock.Add(linux.ClockTick)

pkg/sentry/platform/interrupt/interrupt.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,3 +97,14 @@ func (f *Forwarder) NotifyInterrupt() {
9797
}
9898
f.mu.Unlock()
9999
}
100+
101+
// Preempt preempts the running context. Preempt is a weaker version of
102+
// NotifyInterrupt, it doesn't set the pending flag which is set when a context
103+
// isn't actually running at this moment.
104+
func (f *Forwarder) Preempt() {
105+
f.mu.Lock()
106+
if f.dst != nil {
107+
f.dst.NotifyInterrupt()
108+
}
109+
f.mu.Unlock()
110+
}

pkg/sentry/platform/kvm/context.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,11 @@ func (c *platformContext) Interrupt() {
118118
c.interrupt.NotifyInterrupt()
119119
}
120120

121+
// Preempt implements platform.Context.Preempt.
122+
func (c *platformContext) Preempt() {
123+
c.interrupt.Preempt()
124+
}
125+
121126
// Release implements platform.Context.Release().
122127
func (c *platformContext) Release() {}
123128

pkg/sentry/platform/kvm/kvm.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,12 @@ func (k *KVM) NewAddressSpace(any) (platform.AddressSpace, <-chan struct{}, erro
174174
}, nil, nil
175175
}
176176

177+
// ConcurrencyCount implements platform.Platform.ConcurrencyCount.
178+
// KVM can't run more than maxVCPUs contexts concurrently.
179+
func (k *KVM) ConcurrencyCount() int {
180+
return k.machine.maxVCPUs
181+
}
182+
177183
// NewContext returns an interruptible context.
178184
func (k *KVM) NewContext(pkgcontext.Context) platform.Context {
179185
return &platformContext{

pkg/sentry/platform/platform.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,11 @@ type Platform interface {
116116

117117
// SeccompInfo returns seccomp-related information about this platform.
118118
SeccompInfo() SeccompInfo
119+
120+
// ConcurrencyCount returns the maximum number of contexts that can run
121+
// in parallel. Concurrent calls to Context.Switch() beyond
122+
// ConcurrencyCount() may block until previous calls have returned.
123+
ConcurrencyCount() int
119124
}
120125

121126
// NoCPUPreemptionDetection implements Platform.DetectsCPUPreemption and
@@ -246,6 +251,13 @@ type Context interface {
246251
// ErrContextInterrupt.
247252
Interrupt()
248253

254+
// Preempt interrupts a concurrent call to Switch().
255+
// If Platform.ConcurrencyCount() == math.MaxInt, or if the context is not
256+
// running application code (e.g. it is blocked waiting for the number of
257+
// running contexts to drop below Platform.ConcurrencyCount(), Preempt may
258+
// have no effect.
259+
Preempt()
260+
249261
// Release() releases any resources associated with this context.
250262
Release()
251263

pkg/sentry/platform/ptrace/ptrace.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,8 @@
4545
package ptrace
4646

4747
import (
48+
"math"
49+
4850
"gvisor.dev/gvisor/pkg/abi/linux"
4951
pkgcontext "gvisor.dev/gvisor/pkg/context"
5052
"gvisor.dev/gvisor/pkg/fd"
@@ -189,6 +191,9 @@ func (c *context) Interrupt() {
189191
c.interrupt.NotifyInterrupt()
190192
}
191193

194+
// Preempt implements platform.Context.Preempt.
195+
func (c *context) Preempt() {}
196+
192197
// Release implements platform.Context.Release().
193198
func (c *context) Release() {}
194199

@@ -258,6 +263,11 @@ func (p *PTrace) NewAddressSpace(any) (platform.AddressSpace, <-chan struct{}, e
258263
return as, nil, err
259264
}
260265

266+
// ConcurrencyCount implements platform.Platform.ConcurrencyCount.
267+
func (*PTrace) ConcurrencyCount() int {
268+
return math.MaxInt
269+
}
270+
261271
type constructor struct{}
262272

263273
func (*constructor) New(platform.Options) (platform.Platform, error) {

pkg/sentry/platform/systrap/systrap.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -212,6 +212,9 @@ func (c *platformContext) Interrupt() {
212212
c.interrupt.NotifyInterrupt()
213213
}
214214

215+
// Preempt implements platform.Context.Preempt.
216+
func (c *platformContext) Preempt() {}
217+
215218
// Release releases all platform resources used by the platformContext.
216219
func (c *platformContext) Release() {
217220
if c.sharedContext != nil {
@@ -341,6 +344,11 @@ func (*Systrap) NewContext(ctx pkgcontext.Context) platform.Context {
341344
}
342345
}
343346

347+
// ConcurrencyCount implements platform.Platform.ConcurrencyCount.
348+
func (*Systrap) ConcurrencyCount() int {
349+
return maxSysmsgThreads
350+
}
351+
344352
type constructor struct{}
345353

346354
func (*constructor) New(opts platform.Options) (platform.Platform, error) {

0 commit comments

Comments
 (0)