Skip to content

Commit 931eb18

Browse files
committed
New poll rate control mechanism
1 parent 36afc5c commit 931eb18

File tree

5 files changed

+187
-29
lines changed

5 files changed

+187
-29
lines changed

reframe/frontend/cli.py

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -807,6 +807,38 @@ def main():
807807
help='Timeout for advancing the pipeline',
808808
type=float
809809
)
810+
argparser.add_argument(
811+
dest='poll_randomize_ms',
812+
envvar='RFM_POLL_RANDOMIZE_MS',
813+
configvar='general/poll_randomize_ms',
814+
action='store',
815+
type=typ.List[int],
816+
help='Randomize the sleep interval between polls'
817+
)
818+
argparser.add_argument(
819+
dest='poll_rate_decay',
820+
envvar='RFM_POLL_RATE_DECAY',
821+
configvar='general/poll_rate_decay',
822+
action='store',
823+
type=float,
824+
help='Poll rate decay'
825+
)
826+
argparser.add_argument(
827+
dest='poll_rate_max',
828+
envvar='RFM_POLL_RATE_MAX',
829+
configvar='general/poll_rate_max',
830+
action='store',
831+
type=float,
832+
help='Maximum poll rate'
833+
)
834+
argparser.add_argument(
835+
dest='poll_rate_min',
836+
envvar='RFM_POLL_RATE_MIN',
837+
configvar='general/poll_rate_min',
838+
action='store',
839+
type=float,
840+
help='Minimum poll rate'
841+
)
810842
argparser.add_argument(
811843
dest='remote_detect',
812844
envvar='RFM_REMOTE_DETECT',

reframe/frontend/executors/policies.py

Lines changed: 79 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,15 @@
44
# SPDX-License-Identifier: BSD-3-Clause
55

66
import contextlib
7-
import math
7+
import random
88
import sys
99
import time
1010

1111
import reframe.core.runtime as rt
1212
import reframe.utility as util
1313
import reframe.utility.color as color
14-
from reframe.core.exceptions import (FailureLimitError,
14+
from reframe.core.exceptions import (ConfigError,
15+
FailureLimitError,
1516
RunSessionTimeout,
1617
TaskExit)
1718
from reframe.core.logging import getlogger, level_from_str
@@ -69,33 +70,83 @@ def _print_pipeline_timings(task):
6970

7071

7172
class _PollController:
72-
SLEEP_MIN = 0.1
73-
SLEEP_MAX = 10
74-
SLEEP_INC_RATE = 1.1
73+
def _validate_poll_params(self):
74+
def _check_positive(x, name):
75+
if x <= 0:
76+
raise ConfigError(f'{name} must be a positive number')
77+
78+
_check_positive(self._poll_rate_min, 'minimum poll rate')
79+
_check_positive(self._poll_rate_max, 'maximum poll rate')
80+
if self._poll_rate_max < self._poll_rate_min:
81+
raise ConfigError('maximum poll rate must be greater or equal to '
82+
'minimum poll rate')
83+
84+
if self._poll_rate_decay < 0 or self._poll_rate_decay > 1:
85+
raise ConfigError('poll rate decay must be in range [0,1]')
86+
87+
if self._poll_randomize_range_ms is not None:
88+
left, right = self._poll_randomize_range_ms
89+
if left > 0:
90+
raise ConfigError('left boundary of poll randomization range '
91+
'must be a negative integer or zero')
92+
93+
if right < 0:
94+
raise ConfigError('right boundary of poll randomization range '
95+
'must be a positive integer or zero')
7596

7697
def __init__(self):
77-
self._num_polls = 0
78-
self._sleep_duration = None
79-
self._t_init = None
80-
81-
def reset_snooze_time(self):
82-
self._sleep_duration = self.SLEEP_MIN
98+
get_option = rt.runtime().get_option
99+
self._poll_rate_max = get_option('general/0/poll_rate_max')
100+
self._poll_rate_min = get_option('general/0/poll_rate_min')
101+
self._poll_rate_decay = get_option('general/0/poll_rate_decay')
102+
self._poll_randomize_range_ms = get_option(
103+
'general/0/poll_randomize_ms'
104+
)
105+
self._poll_count_total = 0
106+
self._poll_count_interval = 0
107+
self._validate_poll_params()
108+
self._t_start = None
109+
self._t_last_poll = None
110+
self._desired_poll_rate = self._poll_rate_max
111+
112+
def reset_poll_rate(self):
113+
getlogger().debug2('[P] reset poll rate')
114+
self._poll_count_interval = 0
115+
self._desired_poll_rate = self._poll_rate_max
116+
117+
def _poll_rate(self):
118+
now = time.time()
119+
return (self._poll_count_total / (now - self._t_start),
120+
self._poll_count_interval / (now - self._t_last_poll))
83121

84122
def snooze(self):
85-
if self._num_polls == 0:
86-
self._t_init = time.time()
87-
88-
t_elapsed = time.time() - self._t_init
89-
self._num_polls += 1
90-
poll_rate = self._num_polls / t_elapsed if t_elapsed else math.inf
91-
getlogger().debug2(
92-
f'Poll rate control: sleeping for {self._sleep_duration}s '
93-
f'(current poll rate: {poll_rate} polls/s)'
94-
)
95-
time.sleep(self._sleep_duration)
96-
self._sleep_duration = min(
97-
self._sleep_duration*self.SLEEP_INC_RATE, self.SLEEP_MAX
123+
if self._poll_count_total == 0:
124+
self._t_start = time.time()
125+
self._t_last_poll = self._t_start
126+
dt_sleep = 1. / self._desired_poll_rate
127+
else:
128+
dt_next_interval = time.time() - self._t_last_poll
129+
dt_sleep = (self._poll_count_interval + 1) / self._desired_poll_rate - dt_next_interval # noqa: E501
130+
131+
if self._poll_randomize_range_ms:
132+
sleep_eps = random.randrange(*self._poll_randomize_range_ms)
133+
dt_sleep += sleep_eps / 1000
134+
135+
# Make sure sleep time positive
136+
dt_sleep = max(0, dt_sleep)
137+
time.sleep(dt_sleep)
138+
139+
self._poll_count_total += 1
140+
self._poll_count_interval += 1
141+
poll_rate_global, poll_rate_curr = self._poll_rate()
142+
getlogger().debug2(f'[P] sleep_time={dt_sleep:.6f}, '
143+
f'pr_desired={self._desired_poll_rate:.6f}, '
144+
f'pr_current={poll_rate_curr:.6f}, '
145+
f'pr_global={poll_rate_global:.6f}')
146+
self._desired_poll_rate = max(
147+
self._desired_poll_rate*self._poll_rate_decay, self._poll_rate_min
98148
)
149+
self._t_last_poll = time.time()
99150

100151

101152
class _PolicyEventListener(TaskEventListener):
@@ -227,7 +278,7 @@ def runcase(self, case):
227278
else:
228279
sched = partition.scheduler
229280

230-
self._pollctl.reset_snooze_time()
281+
self._pollctl.reset_poll_rate()
231282
while True:
232283
if not self.dry_run_mode:
233284
sched.poll(task.check.job)
@@ -366,7 +417,7 @@ def exit(self):
366417
if self._pipeline_statistics:
367418
self._init_pipeline_progress(len(self._current_tasks))
368419

369-
self._pollctl.reset_snooze_time()
420+
self._pollctl.reset_poll_rate()
370421
while self._current_tasks:
371422
try:
372423
self._poll_tasks()
@@ -607,7 +658,7 @@ def _abortall(self, cause):
607658
task.abort(cause)
608659

609660
def on_task_exit(self, task):
610-
self._pollctl.reset_snooze_time()
661+
self._pollctl.reset_poll_rate()
611662

612663
def on_task_compile_exit(self, task):
613-
self._pollctl.reset_snooze_time()
664+
self._pollctl.reset_poll_rate()

reframe/schemas/config.json

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -524,6 +524,16 @@
524524
"dump_pipeline_progress": {"type": "boolean"},
525525
"perf_info_level": {"$ref": "#/defs/loglevel"},
526526
"perf_report_spec": {"type": "string"},
527+
"poll_randomize_ms": {"anyOf": [
528+
{"type": "null"},
529+
{"type": "array",
530+
"minItems": 2,
531+
"maxItems": 2,
532+
"items": {"type": "integer"}}
533+
]},
534+
"poll_rate_decay": {"type": "number"},
535+
"poll_rate_max": {"type": "number"},
536+
"poll_rate_min": {"type": "number"},
527537
"pipeline_timeout": {"type": ["number", "null"]},
528538
"purge_environment": {"type": "boolean"},
529539
"remote_detect": {"type": "boolean"},
@@ -600,6 +610,10 @@
600610
"general/non_default_craype": false,
601611
"general/perf_info_level": "info",
602612
"general/perf_report_spec": "now-1d:now/last:/+job_nodelist+result",
613+
"general/poll_randomize_ms": null,
614+
"general/poll_rate_decay": 0.9,
615+
"general/poll_rate_max": 10,
616+
"general/poll_rate_min": 0.1,
603617
"general/pipeline_timeout": 3,
604618
"general/purge_environment": false,
605619
"general/remote_detect": false,

tools/plot_poll_rate.py

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
#!/usr/bin/env python3
2+
3+
import io
4+
import re
5+
import sys
6+
7+
import polars as pl
8+
import plotly.express as px
9+
10+
11+
def read_logfile(logfile):
12+
regex = re.compile(r"\[(\S+)\] debug2:.*sleep_time=(\S+), "
13+
r"pr_desired=(\S+), pr_current=(\S+), pr_global=(\S+)")
14+
csv_data = ''
15+
with open(logfile) as fp:
16+
for line in fp:
17+
if m := regex.match(line):
18+
csv_data += ",".join(m.groups()) + "\n"
19+
20+
if not csv_data:
21+
return pl.DataFrame()
22+
23+
df = pl.read_csv(
24+
io.StringIO(csv_data),
25+
has_header=False,
26+
new_columns=['timestamp', 'sleep_time', 'pr_desired',
27+
'pr_current', 'pr_global']
28+
).with_columns(
29+
pl.col('timestamp').str.to_datetime()
30+
)
31+
return df
32+
33+
34+
def plot_poll_rates(logfile):
35+
px.line(read_logfile(logfile),
36+
x='timestamp', y=['pr_desired', 'pr_current', 'pr_global']).show()
37+
38+
39+
def plot_poll_histogram(logfiles):
40+
dataframes = []
41+
for filename in logfiles:
42+
if not (df := read_logfile(filename)).is_empty():
43+
dataframes.append(
44+
df.with_columns(pl.lit(filename).alias('filename'))
45+
)
46+
47+
fig = px.histogram(pl.concat(dataframes).sort('timestamp'), x='timestamp', color='filename', nbins=100)
48+
fig.show()
49+
50+
51+
def main():
52+
if len(sys.argv[1:]) == 1:
53+
plot_poll_rates(sys.argv[1])
54+
else:
55+
plot_poll_histogram(sys.argv[1:])
56+
57+
return 0
58+
59+
60+
if __name__ == '__main__':
61+
sys.exit(main())

unittests/test_logging.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -186,7 +186,7 @@ def test_rfc3339_timezone_extension(logfile, logger_with_check,
186186
def test_rfc3339_timezone_wrong_directive(logfile, logger_without_check):
187187
formatter = rlog.RFC3339Formatter(
188188
fmt='[%(asctime)s] %(levelname)s: %(check_name)s: %(message)s',
189-
datefmt='%FT%T:z')
189+
datefmt=r'%FT%T:z')
190190
logger_without_check.logger.handlers[0].setFormatter(formatter)
191191
logger_without_check.info('foo')
192192
assert _pattern_in_logfile(':z', logfile)

0 commit comments

Comments
 (0)