Skip to content

Commit e61459b

Browse files
Added different ways for users to set the number of vcpus. (#5890)
The following methods are used in order: - from the `QW_NUM_CPUS` environment variable - from the `KUBERNETES_LIMITS_CPU` environment variable - from the operating system - default to 2. Co-authored-by: fulmicoton <paul.masurel@datadoghq.com>
1 parent 5c9cd01 commit e61459b

File tree

2 files changed

+139
-12
lines changed

2 files changed

+139
-12
lines changed
Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
// Copyright 2021-Present Datadog, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use std::num::NonZero;
16+
17+
use tracing::{error, info, warn};
18+
19+
const QW_NUM_CPUS_ENV_KEY: &str = "QW_NUM_CPUS";
20+
const KUBERNETES_LIMITS_CPU: &str = "KUBERNETES_LIMITS_CPU";
21+
22+
/// Return the number of vCPU/hyperthreads available.
23+
/// The following methods are used in order:
24+
/// - from the `QW_NUM_CPUS` environment variable
25+
/// - from the `KUBERNETES_LIMITS_CPU` environment variable
26+
/// - from the operating system
27+
/// - default to 2.
28+
pub fn num_cpus() -> usize {
29+
let num_cpus_from_os_opt = std::thread::available_parallelism()
30+
.map(NonZero::get)
31+
.inspect_err(|err| {
32+
error!(error=?err, "failed to detect the number of threads available: arbitrarily returning 2");
33+
})
34+
.ok();
35+
let num_cpus_from_env_opt = get_num_cpus_from_env(QW_NUM_CPUS_ENV_KEY);
36+
let num_cpus_from_k8s_limit = get_num_cpus_from_env(KUBERNETES_LIMITS_CPU);
37+
38+
if let Some(num_cpus) = num_cpus_from_env_opt {
39+
return num_cpus;
40+
}
41+
42+
if let Some(num_cpus_from_k8s_limit) = num_cpus_from_k8s_limit {
43+
info!(
44+
"num cpus from k8s limit: {}, possibly overriding os value {:?}",
45+
num_cpus_from_k8s_limit, num_cpus_from_env_opt
46+
);
47+
return num_cpus_from_k8s_limit;
48+
}
49+
50+
if let Some(num_cpus_from_os_opt) = num_cpus_from_os_opt {
51+
info!("num cpus from os: {}", num_cpus_from_os_opt);
52+
return num_cpus_from_os_opt;
53+
}
54+
55+
warn!("failed to detect number of cpus. defaulting to 2");
56+
2
57+
}
58+
59+
fn parse_cpu_to_mcpu(cpu_string: &str) -> Result<usize, &'static str> {
60+
let trimmed_str = cpu_string.trim();
61+
62+
if trimmed_str.is_empty() {
63+
return Err("input cpu_string cannot be empty");
64+
}
65+
66+
if let Some(val_str) = trimmed_str.strip_suffix('m') {
67+
// The value is already in millicores.
68+
val_str
69+
.parse::<usize>()
70+
.map_err(|_| "invalid millicore value")
71+
} else {
72+
// The value is in CPU cores.
73+
let value = trimmed_str
74+
.parse::<f64>()
75+
.map_err(|_| "invalid float value")?;
76+
Ok((value * 1000.0f64) as usize)
77+
}
78+
}
79+
80+
// Get the number of CPUs from an environment variable.
81+
// The value is expected to be in k8s format (200m means 200 millicores, 2 means 2 cores)
82+
//
83+
// We then get the number of vCPUs by ceiling any non integer value.
84+
fn get_num_cpus_from_env(env_key: &str) -> Option<usize> {
85+
let k8s_cpu_limit_str: String = crate::get_from_env_opt(env_key)?;
86+
let mcpus = parse_cpu_to_mcpu(&k8s_cpu_limit_str)
87+
.inspect_err(|err_msg| {
88+
warn!(
89+
"failed to parse k8s cpu limit (`{}`): {}",
90+
k8s_cpu_limit_str, err_msg
91+
);
92+
})
93+
.ok()?;
94+
let num_vcpus = mcpus.div_ceil(1000);
95+
Some(num_vcpus)
96+
}
97+
98+
#[cfg(test)]
99+
mod tests {
100+
use super::*;
101+
102+
#[test]
103+
fn test_millicores() {
104+
assert_eq!(parse_cpu_to_mcpu("500m").unwrap(), 500);
105+
assert_eq!(parse_cpu_to_mcpu("100m").unwrap(), 100);
106+
assert_eq!(parse_cpu_to_mcpu("2500m").unwrap(), 2500);
107+
}
108+
109+
#[test]
110+
fn test_cores() {
111+
assert_eq!(parse_cpu_to_mcpu("1").unwrap(), 1000);
112+
assert_eq!(parse_cpu_to_mcpu("2").unwrap(), 2000);
113+
}
114+
115+
#[test]
116+
fn test_fractional_cores() {
117+
assert_eq!(parse_cpu_to_mcpu("0.5").unwrap(), 500);
118+
assert_eq!(parse_cpu_to_mcpu("1.5").unwrap(), 1500);
119+
assert_eq!(parse_cpu_to_mcpu("0.25").unwrap(), 250);
120+
}
121+
122+
#[test]
123+
fn test_with_whitespace() {
124+
assert_eq!(parse_cpu_to_mcpu(" 750m ").unwrap(), 750);
125+
assert_eq!(parse_cpu_to_mcpu(" 0.75 ").unwrap(), 750);
126+
}
127+
128+
#[test]
129+
fn test_invalid_input() {
130+
assert!(parse_cpu_to_mcpu("").is_err());
131+
assert!(parse_cpu_to_mcpu(" ").is_err());
132+
assert!(parse_cpu_to_mcpu("abc").is_err());
133+
assert!(parse_cpu_to_mcpu("1a").is_err());
134+
assert!(parse_cpu_to_mcpu("m500").is_err());
135+
assert!(parse_cpu_to_mcpu("500m1").is_err());
136+
}
137+
}

quickwit/quickwit-common/src/lib.rs

Lines changed: 2 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ mod coolid;
1919
#[cfg(feature = "jemalloc-profiled")]
2020
pub(crate) mod alloc_tracker;
2121
pub mod binary_heap;
22+
mod cpus;
2223
pub mod fs;
2324
pub mod io;
2425
#[cfg(feature = "jemalloc-profiled")]
@@ -56,6 +57,7 @@ use std::ops::{Range, RangeInclusive};
5657
use std::str::FromStr;
5758

5859
pub use coolid::new_coolid;
60+
pub use cpus::num_cpus;
5961
pub use kill_switch::KillSwitch;
6062
pub use path_hasher::PathHasher;
6163
pub use progress::{Progress, ProtectedZoneGuard};
@@ -199,18 +201,6 @@ pub const fn div_ceil(lhs: i64, rhs: i64) -> i64 {
199201
}
200202
}
201203

202-
/// Return the number of vCPU/hyperthreads available.
203-
/// This number is usually not equal to the number of cpu cores
204-
pub fn num_cpus() -> usize {
205-
match std::thread::available_parallelism() {
206-
Ok(num_cpus) => num_cpus.get(),
207-
Err(io_error) => {
208-
error!(error=?io_error, "failed to detect the number of threads available: arbitrarily returning 2");
209-
2
210-
}
211-
}
212-
}
213-
214204
// The following are helpers to build named tasks.
215205
//
216206
// Named tasks require the tokio feature `tracing` to be enabled. If the

0 commit comments

Comments
 (0)