Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
258 changes: 231 additions & 27 deletions pkg/util/admission/cpu_time_token_filler.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,25 @@ package admission
import (
"time"

"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
)

var KVCPUTimeUtilGoal = settings.RegisterFloatSetting(
settings.SystemOnly,
"admission.cpu_time_tokens.target_util",
"the target CPU utilization for the KV CPU time token system", 0.8)

// timePerTick is how frequently cpuTimeTokenFiller ticks its time.Ticker & adds
// tokens to the buckets. Must be < 1s. Must divide 1s evenly.
const timePerTick = 1 * time.Millisecond

// cpuTimeTokenFiller starts a goroutine which periodically calls
// cpuTimeTokenAllocator to add tokens to a cpuTimeTokenGranter. For example, on
// an 8 vCPU machine, we may want to allow burstable tier-0 work to use 6 seconds
// of CPU time per second. Then cpuTimeTokenAllocator.rates[tier0][canBurst] would
// of CPU time per second. Then the refill rates for tier0 burstable work would
// equal 6 seconds per second, and cpuTimeTokenFiller would add 6 seconds of token
// every second, but smoothly -- 1ms at a time. See cpuTimeTokenGranter for details
// on the multi-dimensional token buckets owned by cpuTimeTokenGranter; the TLDR is
Expand Down Expand Up @@ -55,13 +62,12 @@ type cpuTimeTokenFiller struct {
tickCh *chan struct{}
}

// tokenAllocator abstracts cpuTimeTokenAllocator for testing.
type tokenAllocator interface {
allocateTokens(remainingTicksInInInterval int64)
resetInterval()
}

func (f *cpuTimeTokenFiller) start() {
// The token buckets starts full.
f.allocator.init()
f.allocator.allocateTokens(1)
f.allocator.resetInterval(true /* skipFittingLinearModel */)

ticker := f.timeSource.NewTicker(timePerTick)
intervalStart := f.timeSource.Now()
// Every 1s a new interval starts. every timePerTick time token allocation
Expand All @@ -85,7 +91,7 @@ func (f *cpuTimeTokenFiller) start() {
f.allocator.allocateTokens(1)
}
intervalStart = t
f.allocator.resetInterval()
f.allocator.resetInterval(false /* skipFittingLinearModel */)
remainingTicks = int64(time.Second / timePerTick)
} else {
remainingSinceIntervalStart := time.Second - elapsedSinceIntervalStart
Expand All @@ -111,31 +117,36 @@ func (f *cpuTimeTokenFiller) start() {
}()
}

// tokenAllocator abstracts cpuTimeTokenAllocator for testing.
type tokenAllocator interface {
init()
allocateTokens(remainingTicksInInInterval int64)
resetInterval(skipFittingLinearModel bool)
}

// cpuTimeTokenAllocator allocates tokens to a cpuTimeTokenGranter. See the comment
// above cpuTimeTokenFiller for a high level picture. The responsibility of
// cpuTimeTokenAllocator is to gradually allocate rates tokens every interval,
// while respecting bucketCapacity. We have split up the ticking & token allocation
// logic, in order to improve clarity & testability.
// cpuTimeTokenAllocator is to gradually allocate tokens every interval,
// while respecting bucketCapacity. The computation of the rate of tokens to add
// every interval is left to cpuTimeTokenLinearModel.
type cpuTimeTokenAllocator struct {
granter *cpuTimeTokenGranter
model cpuTimeModel

// Mutable fields. No mutex, since only a single goroutine will call the
// cpuTimeTokenAllocator.

// rates stores the number of token added to each bucket every interval.
rates [numResourceTiers][numBurstQualifications]int64
// bucketCapacity stores the maximum number of tokens that can be in each bucket.
// That is, if a bucket is already at capacity, no more tokens will be added.
bucketCapacity [numResourceTiers][numBurstQualifications]int64
// allocated stores the number of tokens added to each bucket in the current
// interval.
// cpuTimeTokenAllocator. No mutex, since only a single goroutine will call
// the interval.
allocated [numResourceTiers][numBurstQualifications]int64
}

var _ tokenAllocator = &cpuTimeTokenAllocator{}

func (a *cpuTimeTokenAllocator) init() {
a.model.init()
}

// allocateTokens allocates tokens to a cpuTimeTokenGranter. allocateTokens
// adds rates tokens every interval, while respecting bucketCapacity.
// adds the desired number of tokens every interval, while respecting bucketCapacity.
// allocateTokens adds tokens evenly among the expected remaining ticks in
// the interval.
// INVARIANT: remainingTicks >= 1.
Expand All @@ -156,24 +167,217 @@ func (a *cpuTimeTokenAllocator) allocateTokens(expectedRemainingTicksInInterval
return toAllocate
}

rates := a.model.getRefillRates()
var delta [numResourceTiers][numBurstQualifications]int64
for wc := range a.rates {
for kind := range a.rates[wc] {
for wc := range rates {
for kind := range rates[wc] {
toAllocateTokens := allocateFunc(
a.rates[wc][kind], a.allocated[wc][kind], expectedRemainingTicksInInterval)
rates[wc][kind], a.allocated[wc][kind], expectedRemainingTicksInInterval)
a.allocated[wc][kind] += toAllocateTokens
delta[wc][kind] = toAllocateTokens
}
}
a.granter.refill(delta, a.bucketCapacity)
a.granter.refill(delta, rates)
}

// resetInterval is called to signal the beginning of a new interval. allocateTokens
// adds rates tokens every interval.
func (a *cpuTimeTokenAllocator) resetInterval() {
// adds the desired number of tokens every interval.
func (a *cpuTimeTokenAllocator) resetInterval(skipFittingLinearModel bool) {
if !skipFittingLinearModel {
// delta is the difference in tokens to add per interval (1s) from previous
// call to fit to this one. We add it immediately to the bucket. The model itself
// handles filtering.
delta := a.model.fit()
a.granter.refill(delta, a.model.getRefillRates())
}
for wc := range a.allocated {
for kind := range a.allocated[wc] {
a.allocated[wc][kind] = 0
}
}
}

// cpuTimeModel abstracts cpuTimeLinearModel for testing.
type cpuTimeModel interface {
init()
fit() [numResourceTiers][numBurstQualifications]int64
getRefillRates() [numResourceTiers][numBurstQualifications]int64
}

// cpuTimeTokenLinearModel computes the number of CPU time tokens to add
// to each bucket in the cpuTimeTokenGranter, per interval (per 1s). As is
// discussed in the cpuTimeTokenGranter docs, the buckets are arranged in a
// priority hierarchy; some buckets always have more tokens added per second
// than others.
//
// Consider the refill rate of the lowest priority bucket first. In that case:
// refillRate = 1s * vCPU count * targetUtilization * linearCorrectionTerm
//
// This formula is intuitive if linearCorrectionTerm equals 1. If CPU time
// tokens corresponded exactly to actual CPU time, e.g. one token corresponds
// to one nanosecond of CPU time, it would imply the token bucket will limit
// the CPU used by requests subject to it to targetUtilization. Note that
// targetUtilization is controllable via the admission.cpu_time_tokens.target_util
// cluster setting.
//
// Example:
// 8 vCPU machine. admission.cpu_time_tokens.target_util = 0.8 (80%)
// RefillRate = 8 * 1s * .8 = 6.4 seconds of CPU time per second
//
// linearCorrectionTerm is a correction term derived from a linear model (hence
// the name of the struct). There is work happening outside the kvserver BatchRequest
// evaluation path, such as compaction. AC continuously fits a linear model:
// total-cpu-time = linearCorrectionTerm * reported-cpu-time, where a is forced to be
// in the interval [1, 20].
//
// The higher priority buckets have higher target utlizations than the lowest
// priority one. The delta between the target utlizations is fixed, e.g.
// burstable tenant work has a 5% higher target utilization than non-burstable.
type cpuTimeTokenLinearModel struct {
granter tokenUsageTracker
settings *cluster.Settings
cpuMetricProvider CPUMetricProvider
timeSource timeutil.TimeSource

// The time that fit was called last.
lastFitTime time.Time
// The comulative user/sys CPU time used since process start in
// milliseconds.
totalCPUTimeMillis int64
// The CPU capacity measured in vCPUs.
cpuCapacity float64
// The lineabr correction term, see the docs above cpuTimeTokenLinearModel.
tokenToCPUTimeMultiplier float64

// The number of CPU time tokens to add to each bucket per interval (1s).
rates [numResourceTiers][numBurstQualifications]int64
}

type tokenUsageTracker interface {
getTokensUsedInInterval() int64
}

type CPUMetricProvider interface {
// GetCPUInfo returns the comulative user/sys CPU time used since process
// start in milliseconds, and the cpuCapacity measured in vCPUs.
GetCPUInfo() (totalCPUTimeMillis int64, cpuCapacity float64)
}

// init sets tokenToCPUTImeMultipler to 1 & computes refill rates.
func (m *cpuTimeTokenLinearModel) init() {
m.lastFitTime = m.timeSource.Now()
_, cpuCapacity := m.cpuMetricProvider.GetCPUInfo()
m.cpuCapacity = cpuCapacity
m.tokenToCPUTimeMultiplier = 1
_ = m.updateRefillRates()
}

// fit adjusts tokenToCPUTimeMultiplier based on CPU usage & token usage. fit
// computes refill rates from tokenToCPUTimeMultiplier & the admission.cpu_time_tokens.target_util
// cluster setting. fit returns the delta refill rates. That is fit returns the difference in tokens
// to add per interval (1s) from previous call to fit to this one.
func (m *cpuTimeTokenLinearModel) fit() [numResourceTiers][numBurstQualifications]int64 {
now := m.timeSource.Now()
elapsedSinceLastFit := now.Sub(m.lastFitTime)
m.lastFitTime = now

// Get used CPU tokens.
tokensUsed := m.granter.getTokensUsedInInterval()
// At admission time, an estimate of CPU time is deducted. After
// the request is done processing, a correction based on a measurement
// from grunning is deducted. Thus it is theoretically possible for net
// tokens used to be <=0. In this case, we set tokensUsed to 1, so that
// the computation of tokenToCPUTimeMultiplier is well-behaved.
if tokensUsed <= 0 {
tokensUsed = 1
}

// Get used CPU time.
totalCPUTimeMillis, _ := m.cpuMetricProvider.GetCPUInfo()
intCPUTimeMillis := totalCPUTimeMillis - m.totalCPUTimeMillis
// totalCPUTimeMillis is not necessarily monontonic in all envionrments,
// e.g. in case of VM live migration on a public cloud provider.
if intCPUTimeMillis < 0 {
intCPUTimeMillis = 0
}
m.totalCPUTimeMillis = totalCPUTimeMillis
intCPUTimeNanos := intCPUTimeMillis * 1e6

// Update multiplier.
const lowCPUUtilFrac = 0.25
isLowCPUUtil := intCPUTimeNanos < int64(float64(elapsedSinceLastFit)*m.cpuCapacity*lowCPUUtilFrac)
if isLowCPUUtil {
// Ensure that low CPU utilization is not due to a flawed tokenToCPUTimeMultiplier
// by multiplicatively lowering it until we are below the upperBound. If we are already
// below uppperBound, we make no adjustment.
const upperBound = (1 / lowCPUUtilFrac) * 0.9 // 3.6
if m.tokenToCPUTimeMultiplier > upperBound {
m.tokenToCPUTimeMultiplier /= 1.5
if m.tokenToCPUTimeMultiplier < upperBound {
m.tokenToCPUTimeMultiplier = upperBound
}
}
} else {
tokenToCPUTimeMultiplier :=
float64(intCPUTimeNanos) / float64(tokensUsed)
// Mulitplier is forced into the interval [1, 20].
if tokenToCPUTimeMultiplier > 20 {
// Cap the multiplier.
tokenToCPUTimeMultiplier = 20
} else if tokenToCPUTimeMultiplier < 1 {
// Likely because work is queued up in the goroutine scheduler.
tokenToCPUTimeMultiplier = 1
}
// Decrease faster than increase. Giving out too many tokens can
// lead to goroutine scheduling latency.
alpha := 0.5
if tokenToCPUTimeMultiplier < m.tokenToCPUTimeMultiplier {
alpha = 0.8
}

// Exponentially filter changes to the multiplier. 1s of data is noisy,
// so filtering is necessary.
m.tokenToCPUTimeMultiplier =
alpha*tokenToCPUTimeMultiplier + (1-alpha)*m.tokenToCPUTimeMultiplier
}

return m.updateRefillRates()
}

// updateRefillRates computes refill rates from tokenToCPUTimeMultiplier &
// the admission.cpu_time_tokens.target_util cluster setting. updateRefillRates
// returns the delta refill rates. That is updateRefillRates returns the difference
// in tokens to add per interval (1s) from previous call to fit to this one.
func (m *cpuTimeTokenLinearModel) updateRefillRates() [numResourceTiers][numBurstQualifications]int64 {
// Compute goals from cluster setting. Algorithmically, it is okay if some of
// the below goalUtils are greater than 1. This would mean greater risk of
// goroutine scheduling latency, but there is no immediate problem -- the
// greater some goalUtil is, the more CPU time tokens will be in the corresponding
// bucket.
var goalUtils [numResourceTiers][numBurstQualifications]float64
util := KVCPUTimeUtilGoal.Get(&m.settings.SV)
var iter float64
for tier := int(numResourceTiers - 1); tier >= 0; tier-- {
for qual := int(numBurstQualifications - 1); qual >= 0; qual-- {
goalUtils[tier][qual] = util + 0.05*iter
iter++
}
}

// Update refill rates. Return change in rates via delta.
var delta [numResourceTiers][numBurstQualifications]int64
for tier := range goalUtils {
for qual := range goalUtils[tier] {
newRate :=
int64(m.cpuCapacity * float64(time.Second) * goalUtils[tier][qual] / m.tokenToCPUTimeMultiplier)
delta[tier][qual] = newRate - m.rates[tier][qual]
m.rates[tier][qual] = newRate
}
}
return delta
}

// getRefillRates returns the number of CPU time tokens to add to each bucket per interval (1s).
func (m *cpuTimeTokenLinearModel) getRefillRates() [numResourceTiers][numBurstQualifications]int64 {
return m.rates
}
Loading