diff --git a/docs/_static/img/polling-multiproc-default.svg b/docs/_static/img/polling-multiproc-default.svg
new file mode 100644
index 0000000000..689aaf0d77
--- /dev/null
+++ b/docs/_static/img/polling-multiproc-default.svg
@@ -0,0 +1 @@
+
\ No newline at end of file
diff --git a/docs/_static/img/polling-multiproc-randomize.svg b/docs/_static/img/polling-multiproc-randomize.svg
new file mode 100644
index 0000000000..ab5c913a74
--- /dev/null
+++ b/docs/_static/img/polling-multiproc-randomize.svg
@@ -0,0 +1 @@
+
\ No newline at end of file
diff --git a/docs/_static/img/polling-rates.svg b/docs/_static/img/polling-rates.svg
new file mode 100644
index 0000000000..de79b069cd
--- /dev/null
+++ b/docs/_static/img/polling-rates.svg
@@ -0,0 +1 @@
+
\ No newline at end of file
diff --git a/docs/config_reference.rst b/docs/config_reference.rst
index 58ad181b69..2d44279016 100644
--- a/docs/config_reference.rst
+++ b/docs/config_reference.rst
@@ -71,7 +71,7 @@ It consists of the following properties, which we also call conventionally *conf
:required: No
- A list of `general configuration objects <#general-configuration>`__.
+ A list of :ref:`general configuration objects `.
.. py:data:: storage
@@ -1843,6 +1843,8 @@ Result storage configuration
For a detailed description of this property, have a look at the :attr:`~environments.target_systems` definition for environments.
+.. _general-configuration:
+
General Configuration
=====================
@@ -1934,6 +1936,67 @@ General Configuration
Timeout value in seconds used when checking if a git repository exists.
+.. _polling_config:
+
+.. py:attribute:: general.poll_randomize_ms
+
+ :required: No
+ :default: :obj:`None`
+
+ .. versionadded:: 4.9
+
+ Range of randomization of the polling interval in milliseconds.
+
+ If not :obj:`None`, ReFrame will randomize its polling interval within the given range, otherwise no randomization will occur.
+ The range value must be in the form ``[l, h]`` where ``l``, ``h`` are integers such that ``l<=0`` and ``h>=0``.
+
+ See ":ref:`poll-control`" for more details.
+
+
+.. py:attribute:: general.poll_rate_decay
+
+ :required: No
+ :default: 0.1
+
+ .. versionadded:: 4.9
+
+ The decay factor of the polling rate.
+
+ This is a real number in the range ``[0,1]``.
+ If the decay is ``0``, then the polling rate will be constant at :attr:`~general.poll_rate_max`.
+ If the decay is ``1``, then the polling rate will be binary taking either the :attr:`~general.poll_rate_max` or :attr:`~general.poll_rate_min` values.
+
+ See ":ref:`poll-control`" for more details.
+
+
+.. py:attribute:: general.poll_rate_max
+
+ :required: No
+ :default: 10
+
+ .. versionadded:: 4.9
+
+ The maximum desired polling rate.
+
+ If :attr:`~general.poll_randomize_ms` is set to a range that allows the polling interval to be decreased (``l<0``), then the effective polling rate can exceed instantaneously the maximum desired rate.
+
+ See ":ref:`poll-control`" for more details.
+
+
+.. py:attribute:: general.poll_rate_min
+
+ :required: No
+ :default: 0.1
+
+ .. versionadded:: 4.9
+
+ The minimum desired polling rate.
+
+ When the desired polling rate reaches this value, ReFrame will not decrease it further.
+
+ See ":ref:`poll-control`" for more details.
+
+
.. py:attribute:: general.pipeline_timeout
Timeout in seconds for advancing the pipeline in the asynchronous execution policy.
diff --git a/docs/manpage.rst b/docs/manpage.rst
index 1e61ca36dc..296eb4078f 100644
--- a/docs/manpage.rst
+++ b/docs/manpage.rst
@@ -2100,6 +2100,69 @@ Whenever an environment variable is associated with a configuration option, its
.. versionadded:: 3.10.0
+.. _polling_envvars:
+
+.. envvar:: RFM_POLL_RANDOMIZE_MS
+
+ Range of randomization of the polling interval in milliseconds.
+
+ The range is specified in the form ``l,h``.
+
+ .. table::
+ :align: left
+
+ ================================== ==================
+ Associated command line option N/A
+ Associated configuration parameter :attr:`~config.general.poll_randomize_ms`
+ ================================== ==================
+
+ .. versionadded:: 4.9
+
+.. envvar:: RFM_POLL_RATE_DECAY
+
+ The decay factor of the polling rate.
+
+ .. table::
+ :align: left
+
+ ================================== ==================
+ Associated command line option N/A
+ Associated configuration parameter :attr:`~config.general.poll_rate_decay`
+ ================================== ==================
+
+ .. versionadded:: 4.9
+
+
+.. envvar:: RFM_POLL_RATE_MAX
+
+ The maximum desired polling rate.
+
+ .. table::
+ :align: left
+
+ ================================== ==================
+ Associated command line option N/A
+ Associated configuration parameter :attr:`~config.general.poll_rate_max`
+ ================================== ==================
+
+ .. versionadded:: 4.9
+
+
+.. envvar:: RFM_POLL_RATE_MIN
+
+ The minimum desired polling rate.
+
+ .. table::
+ :align: left
+
+ ================================== ==================
+ Associated command line option N/A
+ Associated configuration parameter :attr:`~config.general.poll_rate_min`
+ ================================== ==================
+
+ .. versionadded:: 4.9
+
+
.. envvar:: RFM_PREFIX
General directory prefix for ReFrame-generated directories.
diff --git a/docs/polling.rst b/docs/polling.rst
new file mode 100644
index 0000000000..e2974241a2
--- /dev/null
+++ b/docs/polling.rst
@@ -0,0 +1,59 @@
+.. _poll-control:
+
+===================================
+Undestanding job polling in ReFrame
+===================================
+
+
+ReFrame executes the "compile" and "run" phases of the :doc:`test pipeline ` by spawning "jobs" that will build and execute the test, respectively.
+A job may be a simple local process or a batch job submitted to a job scheduler, such as Slurm.
+
+ReFrame monitors the progress of its spawned jobs through polling.
+It does so in a careful way to avoid overloading the software infrastructure of the job scheduler.
+For example, it will try to poll the status of all its pending jobs at once using a single job scheduler command.
+
+ReFrame adjusts its polling rate dynamically using an exponential decay function to ensure both high interactivity and low load.
+Polling starts at a high rate and -- in absence of any job status changes -- it gradually decays to a minimum value.
+After this point the polling rate remains constant.
+However, whenever a job completes, ReFrame resets its polling rate to the maximum, so as to quickly reap any jobs that are finishing at a close time.
+
+The following figure shows the instant polling rates (desired and current) as well as the global one from the beginning of the run loop.
+The workload is a series of 6 tests, where the i-th test sleeps for ``10*i`` seconds.
+
+.. figure:: _static/img/polling-rates.svg
+ :align: center
+
+ :sub:`Instant and global polling rates of ReFrame as it executes a workload of six tests that sleep different amount of time. The default polling settings are used (poll_rate_max=10, poll_rate_min=0.1, poll_rate_decay=0.1)`
+
+Note how ReFrame resets the instant polling rate whenever a test job finishes.
+
+Users can control the maximum and minimum instant polling rates as well as the polling rate decay through either :ref:`environment variables ` or :ref:`configuration parameters `.
+
+
+Polling randomization
+---------------------
+
+If multiple ReFrame processes execute the same workload at the same time, then the aggregated poll rate can be quite high, potentially stressing the batch scheduler infrastructure.
+The following picture shows the histogram of polls when running concurrently 10 ReFrame processes, each one of them executing a series of 6 tests with varying sleep times (see above):
+
+
+.. figure:: _static/img/polling-multiproc-default.svg
+ :align: center
+
+ :sub:`Poll count histogram of 10 ReFrame processes running concurrently the same workload. Each histogram bin corresponds to a second.`
+
+Note how the total polling rate can significantly exceed the maximum polling rate set in each reframe process.
+
+One option would be to reduce the maximum polling rate of every process, so that their aggregation falls below a certain threshold.
+Alternatively, you can instruct ReFrame to randomize the polling interval duration.
+This has a less drastic effect compared to reducing the maximum polling rate, but it keeps the original polling characteristics, smoothening out the spikes.
+
+The following figure shows poll histogram by setting ``RFM_POLL_RANDOMIZE=-500,1500``.
+This allows ReFrame to reduce randomly the polling interval up to 500ms or extend it up to 1500ms.
+
+.. figure:: _static/img/polling-multiproc-randomize.svg
+ :align: center
+
+ :sub:`Poll count histogram of 10 ReFrame processes executing the same workload using polling interval randomization. Each histogram bin corresponds to a second.`
+
+Note how the spikes are now not so pronounced and polls are better distributed across time.
\ No newline at end of file
diff --git a/docs/topics.rst b/docs/topics.rst
index a660f4fc2a..a2bd06676c 100644
--- a/docs/topics.rst
+++ b/docs/topics.rst
@@ -7,5 +7,6 @@ Advanced Topics
:maxdepth: 2
pipeline
+ polling
dependencies
deferrables
diff --git a/reframe/frontend/cli.py b/reframe/frontend/cli.py
index d76b95be5c..d783e86a40 100644
--- a/reframe/frontend/cli.py
+++ b/reframe/frontend/cli.py
@@ -807,6 +807,38 @@ def main():
help='Timeout for advancing the pipeline',
type=float
)
+ argparser.add_argument(
+ dest='poll_randomize_ms',
+ envvar='RFM_POLL_RANDOMIZE_MS',
+ configvar='general/poll_randomize_ms',
+ action='store',
+ type=typ.List[int],
+ help='Randomize the sleep interval between polls'
+ )
+ argparser.add_argument(
+ dest='poll_rate_decay',
+ envvar='RFM_POLL_RATE_DECAY',
+ configvar='general/poll_rate_decay',
+ action='store',
+ type=float,
+ help='Poll rate decay'
+ )
+ argparser.add_argument(
+ dest='poll_rate_max',
+ envvar='RFM_POLL_RATE_MAX',
+ configvar='general/poll_rate_max',
+ action='store',
+ type=float,
+ help='Maximum poll rate'
+ )
+ argparser.add_argument(
+ dest='poll_rate_min',
+ envvar='RFM_POLL_RATE_MIN',
+ configvar='general/poll_rate_min',
+ action='store',
+ type=float,
+ help='Minimum poll rate'
+ )
argparser.add_argument(
dest='remote_detect',
envvar='RFM_REMOTE_DETECT',
diff --git a/reframe/frontend/executors/policies.py b/reframe/frontend/executors/policies.py
index 75c54ba5a4..a6084fec16 100644
--- a/reframe/frontend/executors/policies.py
+++ b/reframe/frontend/executors/policies.py
@@ -4,14 +4,15 @@
# SPDX-License-Identifier: BSD-3-Clause
import contextlib
-import math
+import random
import sys
import time
import reframe.core.runtime as rt
import reframe.utility as util
import reframe.utility.color as color
-from reframe.core.exceptions import (FailureLimitError,
+from reframe.core.exceptions import (ConfigError,
+ FailureLimitError,
RunSessionTimeout,
TaskExit)
from reframe.core.logging import getlogger, level_from_str
@@ -69,32 +70,83 @@ def _print_pipeline_timings(task):
class _PollController:
- SLEEP_MIN = 0.1
- SLEEP_MAX = 10
- SLEEP_INC_RATE = 1.1
+ def _validate_poll_params(self):
+ def _check_positive(x, name):
+ if x <= 0:
+ raise ConfigError(f'{name} must be a positive number')
+
+ _check_positive(self._poll_rate_min, 'minimum poll rate')
+ _check_positive(self._poll_rate_max, 'maximum poll rate')
+ if self._poll_rate_max < self._poll_rate_min:
+ raise ConfigError('maximum poll rate must be greater or equal to '
+ 'minimum poll rate')
+
+ if self._poll_rate_decay < 0 or self._poll_rate_decay > 1:
+ raise ConfigError('poll rate decay must be in range [0,1]')
+
+ if self._poll_randomize_range_ms is not None:
+ left, right = self._poll_randomize_range_ms
+ if left > 0:
+ raise ConfigError('left boundary of poll randomization range '
+ 'must be a negative integer or zero')
+
+ if right < 0:
+ raise ConfigError('right boundary of poll randomization range '
+ 'must be a positive integer or zero')
def __init__(self):
- self._num_polls = 0
- self._sleep_duration = None
- self._t_init = None
-
- def reset_snooze_time(self):
- self._sleep_duration = self.SLEEP_MIN
+ get_option = rt.runtime().get_option
+ self._poll_rate_max = get_option('general/0/poll_rate_max')
+ self._poll_rate_min = get_option('general/0/poll_rate_min')
+ self._poll_rate_decay = get_option('general/0/poll_rate_decay')
+ self._poll_randomize_range_ms = get_option(
+ 'general/0/poll_randomize_ms'
+ )
+ self._poll_count_total = 0
+ self._poll_count_interval = 0
+ self._validate_poll_params()
+ self._t_start = None
+ self._t_last_reset = None
+ self._desired_poll_rate = self._poll_rate_max
+
+ def reset_poll_rate(self):
+ getlogger().debug2('[P] reset poll rate')
+ self._poll_count_interval = 0
+ self._desired_poll_rate = self._poll_rate_max
+ self._t_last_reset = time.time()
+
+ def _poll_rate(self):
+ now = time.time()
+ return (self._poll_count_total / (now - self._t_start),
+ self._poll_count_interval / (now - self._t_last_reset))
def snooze(self):
- if self._num_polls == 0:
- self._t_init = time.time()
-
- t_elapsed = time.time() - self._t_init
- self._num_polls += 1
- poll_rate = self._num_polls / t_elapsed if t_elapsed else math.inf
- getlogger().debug2(
- f'Poll rate control: sleeping for {self._sleep_duration}s '
- f'(current poll rate: {poll_rate} polls/s)'
- )
- time.sleep(self._sleep_duration)
- self._sleep_duration = min(
- self._sleep_duration*self.SLEEP_INC_RATE, self.SLEEP_MAX
+ if self._poll_count_total == 0:
+ self._t_start = time.time()
+ self._t_last_reset = self._t_start
+ dt_sleep = 1. / self._desired_poll_rate
+ else:
+ dt_next_interval = time.time() - self._t_last_reset
+ dt_sleep = (self._poll_count_interval + 1) / self._desired_poll_rate - dt_next_interval # noqa: E501
+
+ if self._poll_randomize_range_ms:
+ sleep_eps = random.uniform(*self._poll_randomize_range_ms)
+ dt_sleep += sleep_eps / 1000
+
+ # Make sure sleep time positive
+ dt_sleep = max(0, dt_sleep)
+ time.sleep(dt_sleep)
+
+ self._poll_count_total += 1
+ self._poll_count_interval += 1
+ poll_rate_global, poll_rate_curr = self._poll_rate()
+ getlogger().debug2(f'[P] sleep_time={dt_sleep:.6f}, '
+ f'pr_desired={self._desired_poll_rate:.6f}, '
+ f'pr_current={poll_rate_curr:.6f}, '
+ f'pr_global={poll_rate_global:.6f}')
+ self._desired_poll_rate = max(
+ self._desired_poll_rate * (1 - self._poll_rate_decay),
+ self._poll_rate_min
)
@@ -227,7 +279,7 @@ def runcase(self, case):
else:
sched = partition.scheduler
- self._pollctl.reset_snooze_time()
+ self._pollctl.reset_poll_rate()
while True:
if not self.dry_run_mode:
sched.poll(task.check.job)
@@ -366,7 +418,7 @@ def exit(self):
if self._pipeline_statistics:
self._init_pipeline_progress(len(self._current_tasks))
- self._pollctl.reset_snooze_time()
+ self._pollctl.reset_poll_rate()
while self._current_tasks:
try:
self._poll_tasks()
@@ -607,7 +659,7 @@ def _abortall(self, cause):
task.abort(cause)
def on_task_exit(self, task):
- self._pollctl.reset_snooze_time()
+ self._pollctl.reset_poll_rate()
def on_task_compile_exit(self, task):
- self._pollctl.reset_snooze_time()
+ self._pollctl.reset_poll_rate()
diff --git a/reframe/schemas/config.json b/reframe/schemas/config.json
index 4767e80977..ac03ea0fda 100644
--- a/reframe/schemas/config.json
+++ b/reframe/schemas/config.json
@@ -525,6 +525,16 @@
"dump_pipeline_progress": {"type": "boolean"},
"perf_info_level": {"$ref": "#/defs/loglevel"},
"perf_report_spec": {"type": "string"},
+ "poll_randomize_ms": {"anyOf": [
+ {"type": "null"},
+ {"type": "array",
+ "minItems": 2,
+ "maxItems": 2,
+ "items": {"type": "integer"}}
+ ]},
+ "poll_rate_decay": {"type": "number"},
+ "poll_rate_max": {"type": "number"},
+ "poll_rate_min": {"type": "number"},
"pipeline_timeout": {"type": ["number", "null"]},
"purge_environment": {"type": "boolean"},
"remote_detect": {"type": "boolean"},
@@ -601,6 +611,10 @@
"general/non_default_craype": false,
"general/perf_info_level": "info",
"general/perf_report_spec": "now-1d:now/last:/+job_nodelist+result",
+ "general/poll_randomize_ms": null,
+ "general/poll_rate_decay": 0.1,
+ "general/poll_rate_max": 10,
+ "general/poll_rate_min": 0.1,
"general/pipeline_timeout": 3,
"general/purge_environment": false,
"general/remote_detect": false,
diff --git a/tools/plot_poll_rate.py b/tools/plot_poll_rate.py
new file mode 100755
index 0000000000..76a628a182
--- /dev/null
+++ b/tools/plot_poll_rate.py
@@ -0,0 +1,89 @@
+#!/usr/bin/env python3
+#
+# Utility script to plot ReFrame's polling rate.
+#
+# Usage:
+# ./plot_poll_rate.py LOGFILE
+#
+# This produces a diagram of the polling rate.
+#
+# ./plot_poll_rate.py LOGFILE...
+#
+# This produces a histogram of the polling counts from different ReFrame
+# processes.
+#
+# The log files must contain `debug2` information.
+
+import io
+import math
+import re
+import sys
+
+import polars as pl
+import plotly.express as px
+
+
+def read_logfile(logfile):
+ regex = re.compile(r"\[(\S+)\] debug2:.*sleep_time=(\S+), "
+ r"pr_desired=(\S+), pr_current=(\S+), pr_global=(\S+)")
+ csv_data = ''
+ with open(logfile) as fp:
+ for line in fp:
+ if m := regex.match(line):
+ csv_data += ",".join(m.groups()) + "\n"
+
+ if not csv_data:
+ return pl.DataFrame()
+
+ df = pl.read_csv(
+ io.StringIO(csv_data),
+ has_header=False,
+ new_columns=['Timestamp', 'sleep_time', 'Instant rate (desired)',
+ 'Instant rate (current)', 'Global rate']
+ ).with_columns(
+ pl.col('Timestamp').str.to_datetime()
+ )
+ return df
+
+
+def plot_poll_rates(logfile):
+ fig = px.line(
+ read_logfile(logfile),
+ x='Timestamp',
+ y=['Instant rate (desired)', 'Instant rate (current)', 'Global rate'],
+ labels={'value': 'Polling Rate (Hz)', 'variable': 'Polling rates'}
+ )
+ fig.show()
+ # fig.write_image('plot.svg')
+
+
+def plot_poll_histogram(logfiles):
+ dataframes = []
+ rfm_procs = 0
+ for filename in logfiles:
+ if not (df := read_logfile(filename)).is_empty():
+ rfm_procs += 1
+ dataframes.append(
+ df.with_columns(pl.lit(f'Process {rfm_procs}').alias('ReFrame process'))
+ )
+
+ df = pl.concat(dataframes).sort('Timestamp')
+ nbins = math.ceil((df['Timestamp'].max() - df['Timestamp'].min()).total_seconds())
+ fig = px.histogram(
+ df, x='Timestamp', color='ReFrame process', nbins=nbins
+ ).update_layout(yaxis_title='Poll count')
+ fig.show()
+ # fig.write_image('hist.svg')
+
+
+def main():
+ if len(sys.argv[1:]) == 1:
+ plot_poll_rates(sys.argv[1])
+ else:
+ plot_poll_histogram(sys.argv[1:])
+
+ return 0
+
+
+if __name__ == '__main__':
+ sys.exit(main())
diff --git a/tools/requirements.txt b/tools/requirements.txt
new file mode 100644
index 0000000000..41de5327c5
--- /dev/null
+++ b/tools/requirements.txt
@@ -0,0 +1,4 @@
+kaleido
+matplotlib
+polars
+plotly-express
diff --git a/unittests/test_logging.py b/unittests/test_logging.py
index 8bda8aa9bf..b67e4f2dd1 100644
--- a/unittests/test_logging.py
+++ b/unittests/test_logging.py
@@ -186,7 +186,7 @@ def test_rfc3339_timezone_extension(logfile, logger_with_check,
def test_rfc3339_timezone_wrong_directive(logfile, logger_without_check):
formatter = rlog.RFC3339Formatter(
fmt='[%(asctime)s] %(levelname)s: %(check_name)s: %(message)s',
- datefmt='%FT%T:z')
+ datefmt=r'%FT%T:z')
logger_without_check.logger.handlers[0].setFormatter(formatter)
logger_without_check.info('foo')
assert _pattern_in_logfile(':z', logfile)
diff --git a/unittests/test_policies.py b/unittests/test_policies.py
index 995c541b50..1e159a3583 100644
--- a/unittests/test_policies.py
+++ b/unittests/test_policies.py
@@ -17,6 +17,7 @@
import unittests.utility as test_util
from reframe.core.exceptions import (AbortTaskError,
+ ConfigError,
FailureLimitError,
ForceExitError,
RunSessionTimeout)
@@ -760,3 +761,61 @@ def test_expected_failures(make_runner, make_loader, make_exec_ctx):
assert len(runner.stats.failed()) == 4
assert len(runner.stats.xfailed()) == 2
assert len(runner.stats.xpassed()) == 2
+
+
+@pytest.fixture(params=[[100, 200], [-200, -100], [0, -100]])
+def invalid_poll_randomize_range(request):
+ return request.param
+
+
+def test_invalid_poll_randomize_ms(make_runner, make_exec_ctx,
+ invalid_poll_randomize_range):
+ make_exec_ctx(
+ options={'general/poll_randomize_ms': invalid_poll_randomize_range}
+ )
+ with pytest.raises(ConfigError):
+ make_runner()
+
+
+@pytest.fixture(params=[-1, 2])
+def invalid_poll_rate_decay(request):
+ return request.param
+
+
+def test_invalid_poll_rate_decay(make_runner, make_exec_ctx,
+ invalid_poll_rate_decay):
+ make_exec_ctx(
+ options={'general/poll_rate_decay': invalid_poll_rate_decay}
+ )
+ with pytest.raises(ConfigError):
+ make_runner()
+
+
+@pytest.fixture(params=[-1])
+def invalid_poll_rate(request):
+ return request.param
+
+
+def test_invalid_poll_rate_min(make_runner, make_exec_ctx, invalid_poll_rate):
+ make_exec_ctx(
+ options={'general/poll_rate_min': invalid_poll_rate}
+ )
+ with pytest.raises(ConfigError):
+ make_runner()
+
+
+def test_invalid_poll_rate_max(make_runner, make_exec_ctx, invalid_poll_rate):
+ make_exec_ctx(
+ options={'general/poll_rate_max': invalid_poll_rate}
+ )
+ with pytest.raises(ConfigError):
+ make_runner()
+
+
+def test_poll_randomize_range_ms(make_runner, make_exec_ctx, make_cases):
+ make_exec_ctx(
+ options={'general/poll_randomize_ms': [-100, 500]}
+ )
+ runner = make_runner()
+ runner.runall(make_cases())
+ assert_runall(runner)