diff --git a/docs/further.md b/docs/further.md index 51ba55a9..dd999d39 100644 --- a/docs/further.md +++ b/docs/further.md @@ -64,6 +64,79 @@ See the [snakemake documentation on profiles](https://snakemake.readthedocs.io/e How and where you set configurations on factors like file size or increasing the runtime with every `attempt` of running a job (if [`--retries` is greater than `0`](https://snakemake.readthedocs.io/en/stable/executing/cli.html#snakemake.cli-get_argument_parser-behavior)). [There are detailed examples for these in the snakemake documentation.](https://snakemake.readthedocs.io/en/stable/snakefiles/rules.html#dynamic-resources) +#### Automatic Partition Selection + +The SLURM executor plugin supports automatic partition selection based on job resource requirements, via the command line option `--slurm-partition-config`. This feature allows the plugin to choose the most appropriate partition for each job, without the need to manually specify partitions for different job types. This also enables variable partition selection as a job's resource requirements change based on [dynamic resources](#dynamic-resource-specification), ensuring that jobs are always scheduled to an appropriate partition. + +*Jobs that explicitly specify a `slurm_partition` resource will bypass automatic selection and use the specified partition directly.* + +##### Partition Limits Specification + +To enable automatic partition selection, create a YAML configuration file that defines the available partitions and their resource limits. This file should be structured as follows: + +```yaml +partitions: + some_partition: + max_runtime: 100 + another_partition: + ... +``` +Where `some_partition` and `another_partition` are the names of the partition on your cluster, according to `sinfo`. + +The following limits can be defined for each partition: + +| Parameter | Type | Description | Default | +| ----------------------- | --------- | ---------------------------------- | --------- | +| `max_runtime` | int | Maximum walltime in minutes | unlimited | +| `max_mem_mb` | int | Maximum total memory in MB | unlimited | +| `max_mem_mb_per_cpu` | int | Maximum memory per CPU in MB | unlimited | +| `max_cpus_per_task` | int | Maximum CPUs per task | unlimited | +| `max_nodes` | int | Maximum number of nodes | unlimited | +| `max_tasks` | int | Maximum number of tasks | unlimited | +| `max_tasks_per_node` | int | Maximum tasks per node | unlimited | +| `max_gpu` | int | Maximum number of GPUs | 0 | +| `available_gpu_models` | list[str] | List of available GPU models | none | +| `max_cpus_per_gpu` | int | Maximum CPUs per GPU | unlimited | +| `supports_mpi` | bool | Whether MPI jobs are supported | true | +| `max_mpi_tasks` | int | Maximum MPI tasks | unlimited | +| `available_constraints` | list[str] | List of available node constraints | none | + +##### Example Partition Configuration + +```yaml +partitions: + standard: + max_runtime: 720 # 12 hours + max_mem_mb: 64000 # 64 GB + max_cpus_per_task: 24 + max_nodes: 1 + + highmem: + max_runtime: 1440 # 24 hours + max_mem_mb: 512000 # 512 GB + max_mem_mb_per_cpu: 16000 + max_cpus_per_task: 48 + max_nodes: 1 + + gpu: + max_runtime: 2880 # 48 hours + max_mem_mb: 128000 # 128 GB + max_cpus_per_task: 32 + max_gpu: 8 + available_gpu_models: ["a100", "v100", "rtx3090"] + max_cpus_per_gpu: 8 +``` + +##### How Partition Selection Works + +When automatic partition selection is enabled, the plugin evaluates each job's resource requirements against the defined partition limits to ensure the job is placed on a partition that can accommodate all of its requirements. When multiple partitions are compatible, the plugin uses a scoring algorithm that favors partitions with limits closer to the job's needs, preventing jobs from being assigned to partitions with excessively high resource limits. + +The scoring algorithm calculates a score by summing the ratios of requested resources to partition limits (e.g., if a job requests 8 CPUs and a partition allows 16, this contributes 0.5 to the score). Higher scores indicate better resource utilization, so a job requesting 8 CPUs would prefer a 16-CPU partition (score 0.5) over a 64-CPU partition (score 0.125). + +##### Fallback Behavior + +If no suitable partition is found based on the job's resource requirements, the plugin falls back to the default SLURM behavior, which typically uses the cluster's default partition or any partition specified explicitly in the job's resources. + #### Standard Resources diff --git a/snakemake_executor_plugin_slurm/__init__.py b/snakemake_executor_plugin_slurm/__init__.py index 583b0773..b8e928a5 100644 --- a/snakemake_executor_plugin_slurm/__init__.py +++ b/snakemake_executor_plugin_slurm/__init__.py @@ -3,6 +3,7 @@ __email__ = "johannes.koester@uni-due.de" __license__ = "MIT" +import atexit import csv from io import StringIO import os @@ -35,6 +36,7 @@ ) from .efficiency_report import create_efficiency_report from .submit_string import get_submit_command +from .partitions import read_partition_file, get_best_partition from .validation import validate_slurm_extra @@ -113,6 +115,15 @@ class ExecutorSettings(ExecutorSettingsBase): "required": False, }, ) + partition_config: Optional[Path] = field( + default=None, + metadata={ + "help": "Path to YAML file defining partition limits for dynamic " + "partition selection. When provided, jobs will be dynamically " + "assigned to the best-fitting partition based on " + "See documentation for complete list of available limits.", + }, + ) efficiency_report: bool = field( default=False, metadata={ @@ -201,6 +212,12 @@ def __post_init__(self, test_mode: bool = False): if self.workflow.executor_settings.logdir else Path(".snakemake/slurm_logs").resolve() ) + self._partitions = ( + read_partition_file(self.workflow.executor_settings.partition_config) + if self.workflow.executor_settings.partition_config + else None + ) + atexit.register(self.clean_old_logs) def shutdown(self) -> None: """ @@ -305,6 +322,8 @@ def run_job(self, job: JobExecutorInterface): if job.resources.get("slurm_extra"): self.check_slurm_extra(job) + # NOTE removed partition from below, such that partition + # selection can benefit from resource checking as the call is built up. job_params = { "run_uuid": self.run_uuid, "slurm_logfile": slurm_logfile, @@ -698,9 +717,13 @@ def get_partition_arg(self, job: JobExecutorInterface): returns a default partition, if applicable else raises an error - implicetly. """ + partition = None if job.resources.get("slurm_partition"): partition = job.resources.slurm_partition - else: + elif self._partitions: + partition = get_best_partition(self._partitions, job, self.logger) + # we didnt get a partition yet so try fallback. + if not partition: if self._fallback_partition is None: self._fallback_partition = self.get_default_partition(job) partition = self._fallback_partition diff --git a/snakemake_executor_plugin_slurm/partitions.py b/snakemake_executor_plugin_slurm/partitions.py new file mode 100644 index 00000000..834d2467 --- /dev/null +++ b/snakemake_executor_plugin_slurm/partitions.py @@ -0,0 +1,268 @@ +from dataclasses import dataclass +from typing import Optional, List +import yaml +from pathlib import Path +from math import inf, isinf +from snakemake_interface_common.exceptions import WorkflowError +from snakemake_interface_executor_plugins.jobs import ( + JobExecutorInterface, +) +from snakemake_interface_executor_plugins.logging import LoggerExecutorInterface + + +def read_partition_file(partition_file: Path) -> List["Partition"]: + with open(partition_file, "r") as f: + out = [] + partitions_dict = yaml.safe_load(f)["partitions"] + for partition_name, partition_config in partitions_dict.items(): + if not partition_name or not partition_name.strip(): + raise KeyError("Partition name cannot be empty") + + out.append( + Partition( + name=partition_name, + limits=PartitionLimits(**partition_config), + ) + ) + return out + + +def get_best_partition( + candidate_partitions: List["Partition"], + job: JobExecutorInterface, + logger: LoggerExecutorInterface, +) -> Optional[str]: + scored_partitions = [ + (p, score) + for p in candidate_partitions + if (score := p.score_job_fit(job)) is not None + ] + + if scored_partitions: + best_partition, best_score = max(scored_partitions, key=lambda x: x[1]) + partition = best_partition.name + logger.warning( + f"Auto-selected partition '{partition}' for job {job.name} " + f"with score {best_score:.3f}" + ) + return partition + else: + logger.warning( + f"No suitable partition found for job {job.name} based on " + f"resource requirements. Falling back to default behavior." + ) + return None + + +def parse_gpu_requirements(job: JobExecutorInterface) -> tuple[int, Optional[str]]: + """Parse GPU requirements from job resources. Returns (count, model)""" + gpu_required = job.resources.get("gpu", 0) + gres = job.resources.get("gres", "") + + # Convert to int if it's a string representation of a number + if isinstance(gpu_required, str): + try: + gpu_required = int(gpu_required) + except ValueError: + gpu_required = 0 + + # Ensure gres is a string + if not isinstance(gres, str): + gres = str(gres) if gres else "" + + if "gpu" in gres and gpu_required: + raise WorkflowError( + "GPU resource specified in both 'gpu' and 'gres'. These are mutually exclusive." # noqa: E501 + ) + + if gpu_required: + return int(gpu_required), job.resources.get("gpu_model") + elif "gpu" in gres: + # Parse gres string format: gpu: or gpu:: + gpu_parts = [part for part in gres.split(",") if part.strip().startswith("gpu")] + if gpu_parts: + gpu_spec = gpu_parts[0].strip().split(":") + if len(gpu_spec) == 2: # gpu: + return int(gpu_spec[1]), None + elif len(gpu_spec) == 3: # gpu:: + return int(gpu_spec[2]), gpu_spec[1] + + return 0, None + + +def get_job_cpu_requirement(job: JobExecutorInterface) -> tuple[int, str]: + """ + This uses the same logic as snakemake_executor_plugin_slurm_jobstep.get_cpu_setting, but returns a tuple instead of a arg string. # noqa: E501 + """ + + gpu_required = job.resources.get("gpu", 0) + gres = job.resources.get("gres", "") + + # Convert gpu_required to int if it's a string + if isinstance(gpu_required, str): + try: + gpu_required = int(gpu_required) + except ValueError: + gpu_required = 0 + + # Ensure gres is a string for the "in" check + if not isinstance(gres, str): + gres = str(gres) if gres else "" + + has_gpu = bool(gpu_required) or "gpu" in gres + + cpus_per_task = job.resources.get("cpus_per_task") + if cpus_per_task is not None: + # Convert to int if it's a string + if isinstance(cpus_per_task, str): + try: + cpus_per_task = int(cpus_per_task) + except ValueError: + cpus_per_task = 0 + else: + cpus_per_task = int(cpus_per_task) + + if cpus_per_task < 0: + return (0, "none") + # ensure that at least 1 cpu is requested because 0 is not allowed by slurm + return (max(1, cpus_per_task), "task") + + elif has_gpu: + cpus_per_gpu = job.resources.get("cpus_per_gpu") + if cpus_per_gpu is not None: + # Convert to int if it's a string + if isinstance(cpus_per_gpu, str): + try: + cpus_per_gpu = int(cpus_per_gpu) + except ValueError: + cpus_per_gpu = 0 + else: + cpus_per_gpu = int(cpus_per_gpu) + + if cpus_per_gpu <= 0: + return (0, "none") + return (cpus_per_gpu, "gpu") + + return (job.threads, "task") + + +@dataclass +class PartitionLimits: + """Represents resource limits for a SLURM partition""" + + # Standard resources + max_runtime: float = inf # minutes + max_mem_mb: float = inf + max_mem_mb_per_cpu: float = inf + max_cpus_per_task: float = inf + + # SLURM-specific resources + max_nodes: float = inf + max_tasks: float = inf + max_tasks_per_node: float = inf + + # GPU resources + max_gpu: int = 0 + available_gpu_models: Optional[List[str]] = None + max_cpus_per_gpu: float = inf + + # MPI resources + supports_mpi: bool = True + max_mpi_tasks: float = inf + + # Node features/constraints + available_constraints: Optional[List[str]] = None + + +@dataclass +class Partition: + """Represents a SLURM partition with its properties and limits""" + + name: str + limits: PartitionLimits + + def score_job_fit(self, job: JobExecutorInterface) -> Optional[float]: + """ + Check if a job can run on this partition. If not return none. + Calculate a score for how well a partition fits the job requirements + """ + + # try to score how closely a job matches a partition's limits, in order to handle case where multiple partitions can run a given job # noqa: E501 + # naive approach here is to just sum the ratio of requested resource to limit, of course this limits us to only consider numerical resources # noqa: E501 + # here a higher score indicates a better fit + # TODO decide how to handle unspecified limits, for now we assume inf for numerical limits, none for others. # noqa: E501 + + score = 0.0 + + numerical_resources = { + "mem_mb": self.limits.max_mem_mb, + "mem_mb_per_cpu": self.limits.max_mem_mb_per_cpu, + "runtime": self.limits.max_runtime, + "nodes": self.limits.max_nodes, + "tasks": self.limits.max_tasks, + "tasks_per_node": self.limits.max_tasks_per_node, + "mpi_tasks": self.limits.max_mpi_tasks, + } + + for resource_key, limit in numerical_resources.items(): + job_requirement = job.resources.get(resource_key, 0) + # Convert to numeric value if it's a string + if isinstance(job_requirement, str): + try: + job_requirement = float(job_requirement) + except ValueError: + job_requirement = 0 + elif not isinstance(job_requirement, (int, float)): + job_requirement = 0 + + if job_requirement > 0: + if not isinf(limit) and job_requirement > limit: + return None + if not isinf(limit): + score += job_requirement / limit + + cpu_count, cpu_type = get_job_cpu_requirement(job) + if cpu_type == "task" and cpu_count > 0: + if ( + not isinf(self.limits.max_cpus_per_task) + and cpu_count > self.limits.max_cpus_per_task + ): + return None + if not isinf(self.limits.max_cpus_per_task): + score += cpu_count / self.limits.max_cpus_per_task + elif cpu_type == "gpu" and cpu_count > 0: + if ( + not isinf(self.limits.max_cpus_per_gpu) + and cpu_count > self.limits.max_cpus_per_gpu + ): + return None + if not isinf(self.limits.max_cpus_per_gpu): + score += cpu_count / self.limits.max_cpus_per_gpu + + gpu_count, gpu_model = parse_gpu_requirements(job) + if gpu_count > 0: + if self.limits.max_gpu == 0 or gpu_count > self.limits.max_gpu: + return None + score += gpu_count / self.limits.max_gpu + + if gpu_model and self.limits.available_gpu_models: + if gpu_model not in self.limits.available_gpu_models: + return None + + if job.resources.get("mpi") and not self.limits.supports_mpi: + return None + + constraint = job.resources.get("constraint") + if constraint and self.limits.available_constraints: + # Ensure constraint is a string + if not isinstance(constraint, str): + constraint = str(constraint) + required_constraints = [ + c.strip() for c in constraint.split(",") if c.strip() + ] + if not all( + req in self.limits.available_constraints for req in required_constraints + ): + return None + + return score diff --git a/tests/tests.py b/tests/tests.py index 6dc47098..d881886b 100644 --- a/tests/tests.py +++ b/tests/tests.py @@ -7,6 +7,8 @@ from snakemake_interface_executor_plugins.settings import ExecutorSettingsBase from unittest.mock import MagicMock, patch import pytest +import tempfile +import yaml from snakemake_executor_plugin_slurm import ExecutorSettings from snakemake_executor_plugin_slurm.efficiency_report import ( @@ -15,6 +17,10 @@ ) from snakemake_executor_plugin_slurm.utils import set_gres_string from snakemake_executor_plugin_slurm.submit_string import get_submit_command +from snakemake_executor_plugin_slurm.partitions import ( + read_partition_file, + get_best_partition, +) from snakemake_executor_plugin_slurm.validation import validate_slurm_extra from snakemake_interface_common.exceptions import WorkflowError import pandas as pd @@ -788,6 +794,495 @@ def test_wildcard_slash_replacement(self): assert "/" not in wildcard_str +class TestPartitionSelection: + @pytest.fixture + def basic_partition_config(self): + """Basic partition configuration with two partitions.""" + return { + "partitions": { + "default": { + "max_runtime": 1440, + "max_mem_mb": 128000, + "max_cpus_per_task": 32, + "supports_mpi": True, + }, + "gpu": { + "max_runtime": 720, + "max_mem_mb": 256000, + "max_gpu": 4, + "available_gpu_models": ["a100", "v100"], + "supports_mpi": False, + }, + } + } + + @pytest.fixture + def minimal_partition_config(self): + """Minimal partition configuration.""" + return {"partitions": {"minimal": {}}} + + @pytest.fixture + def comprehensive_partition_config(self): + """Comprehensive partition configuration with all limit types.""" + return { + "partitions": { + "comprehensive": { + # Standard resources + "max_runtime": 2880, + "max_mem_mb": 500000, + "max_mem_mb_per_cpu": 8000, + "max_cpus_per_task": 64, + # SLURM-specific resources + "max_nodes": 4, + "max_tasks": 256, + "max_tasks_per_node": 64, + # GPU resources + "max_gpu": 8, + "available_gpu_models": ["a100", "v100", "rtx3090"], + "max_cpus_per_gpu": 16, + # MPI resources + "supports_mpi": True, + "max_mpi_tasks": 512, + # Node features/constraints + "available_constraints": ["intel", "avx2", "highmem"], + } + } + } + + @pytest.fixture + def empty_partitions_config(self): + """Empty partitions configuration.""" + return {"partitions": {}} + + @pytest.fixture + def missing_name_config(self): + """Configuration with missing name field.""" + return {"partitions": {"": {}}} # Empty partition name + + @pytest.fixture + def invalid_key_config(self): + """Configuration with invalid key.""" + return {"invalid_key": []} + + @pytest.fixture + def temp_yaml_file(self): + """Helper fixture to create temporary YAML files.""" + + def _create_temp_file(config): + with tempfile.NamedTemporaryFile( + mode="w", suffix=".yaml", delete=False + ) as f: + yaml.dump(config, f) + return Path(f.name) + + return _create_temp_file + + def test_read_valid_partition_file(self, basic_partition_config, temp_yaml_file): + """Test reading a valid partition configuration file.""" + temp_path = temp_yaml_file(basic_partition_config) + + try: + partitions = read_partition_file(temp_path) + + assert len(partitions) == 2 + + # Check first partition + assert partitions[0].name == "default" + assert partitions[0].limits.max_runtime == 1440 + assert partitions[0].limits.max_mem_mb == 128000 + assert partitions[0].limits.max_cpus_per_task == 32 + assert partitions[0].limits.supports_mpi is True + + # Check second partition + assert partitions[1].name == "gpu" + assert partitions[1].limits.max_runtime == 720 + assert partitions[1].limits.max_gpu == 4 + assert partitions[1].limits.available_gpu_models == ["a100", "v100"] + assert partitions[1].limits.supports_mpi is False + + finally: + temp_path.unlink() + + def test_read_minimal_partition_file( + self, minimal_partition_config, temp_yaml_file + ): + """Test reading a partition file with minimal configuration.""" + from math import isinf + + temp_path = temp_yaml_file(minimal_partition_config) + + try: + partitions = read_partition_file(temp_path) + + assert len(partitions) == 1 + assert partitions[0].name == "minimal" + + # Check that all limits are inf + limits = partitions[0].limits + assert isinf(limits.max_runtime) + assert isinf(limits.max_mem_mb) + assert limits.max_gpu == 0 + assert limits.supports_mpi is True + + finally: + temp_path.unlink() + + def test_read_partition_file_with_all_limits( + self, comprehensive_partition_config, temp_yaml_file + ): + """Test reading a partition file with all possible limit types.""" + temp_path = temp_yaml_file(comprehensive_partition_config) + + try: + partitions = read_partition_file(temp_path) + + assert len(partitions) == 1 + limits = partitions[0].limits + + # Check standard resources + assert limits.max_runtime == 2880 + assert limits.max_mem_mb == 500000 + assert limits.max_mem_mb_per_cpu == 8000 + assert limits.max_cpus_per_task == 64 + + # Check SLURM-specific resources + assert limits.max_nodes == 4 + assert limits.max_tasks == 256 + assert limits.max_tasks_per_node == 64 + + # Check GPU resources + assert limits.max_gpu == 8 + assert limits.available_gpu_models == ["a100", "v100", "rtx3090"] + assert limits.max_cpus_per_gpu == 16 + + # Check MPI resources + assert limits.supports_mpi is True + assert limits.max_mpi_tasks == 512 + + # Check constraints + assert limits.available_constraints == ["intel", "avx2", "highmem"] + + finally: + temp_path.unlink() + + def test_read_empty_partitions_list(self, empty_partitions_config, temp_yaml_file): + """Test reading a file with empty partitions list.""" + temp_path = temp_yaml_file(empty_partitions_config) + + try: + partitions = read_partition_file(temp_path) + assert len(partitions) == 0 + + finally: + temp_path.unlink() + + def test_read_nonexistent_file(self): + """Test reading a non-existent file raises appropriate error.""" + nonexistent_path = Path("/nonexistent/path/to/file.yaml") + + with pytest.raises(FileNotFoundError): + read_partition_file(nonexistent_path) + + def test_read_invalid_yaml_file(self): + """Test reading an invalid YAML file raises appropriate error.""" + invalid_yaml = "partitions:\n - name: test\n invalid: {\n" + + with tempfile.NamedTemporaryFile(mode="w", suffix=".yaml", delete=False) as f: + f.write(invalid_yaml) + temp_path = Path(f.name) + + try: + with pytest.raises(yaml.YAMLError): + read_partition_file(temp_path) + finally: + temp_path.unlink() + + def test_read_file_missing_partitions_key(self, invalid_key_config, temp_yaml_file): + """Test reading a file without 'partitions' key raises KeyError.""" + temp_path = temp_yaml_file(invalid_key_config) + + try: + with pytest.raises(KeyError): + read_partition_file(temp_path) + finally: + temp_path.unlink() + + def test_read_partition_missing_required_fields( + self, missing_name_config, temp_yaml_file + ): + """Test reading partition with missing required fields.""" + temp_path = temp_yaml_file(missing_name_config) + + try: + with pytest.raises(KeyError): + read_partition_file(temp_path) + finally: + temp_path.unlink() + + @pytest.fixture + def mock_job(self): + """Create a mock job with configurable resources and threads.""" + + def _create_job(threads=1, **resources): + mock_resources = MagicMock() + mock_resources.get.side_effect = lambda key, default=None: resources.get( + key, default + ) + + mock_job = MagicMock() + mock_job.resources = mock_resources + mock_job.threads = threads + mock_job.name = "test_job" + mock_job.is_group.return_value = False + mock_job.jobid = 1 + return mock_job + + return _create_job + + @pytest.fixture + def mock_logger(self): + """Create a mock logger.""" + return MagicMock() + + def test_basic_partition_selection_cpu_job( + self, basic_partition_config, temp_yaml_file, mock_job, mock_logger + ): + """Test selecting partition for a basic CPU job.""" + temp_path = temp_yaml_file(basic_partition_config) + + try: + partitions = read_partition_file(temp_path) + job = mock_job(threads=4, mem_mb=16000, runtime=60) + + selected_partition = get_best_partition(partitions, job, mock_logger) + + # Should select 'default' partition as it supports CPU jobs + assert selected_partition == "default" + mock_logger.warning.assert_called_once() + assert ( + "Auto-selected partition 'default'" + in mock_logger.warning.call_args[0][0] + ) + finally: + temp_path.unlink() + + def test_basic_partition_selection_gpu_job( + self, basic_partition_config, temp_yaml_file, mock_job, mock_logger + ): + """Test selecting partition for a GPU job.""" + temp_path = temp_yaml_file(basic_partition_config) + + try: + partitions = read_partition_file(temp_path) + job = mock_job( + threads=2, mem_mb=32000, runtime=300, gpu=2, gpu_model="a100" + ) + + selected_partition = get_best_partition(partitions, job, mock_logger) + + # Should select 'gpu' partition as it supports GPU jobs + assert selected_partition == "gpu" + mock_logger.warning.assert_called_once() + assert ( + "Auto-selected partition 'gpu'" in mock_logger.warning.call_args[0][0] + ) + finally: + temp_path.unlink() + + def test_no_suitable_partition( + self, basic_partition_config, temp_yaml_file, mock_job, mock_logger + ): + """Test when no partition can accommodate the job requirements.""" + temp_path = temp_yaml_file(basic_partition_config) + + try: + partitions = read_partition_file(temp_path) + # Job requires more memory than any partition allows + job = mock_job(threads=1, mem_mb=500000, runtime=60) + + selected_partition = get_best_partition(partitions, job, mock_logger) + + # Should return None when no suitable partition found + assert selected_partition is None + mock_logger.warning.assert_called_once() + assert "No suitable partition found" in mock_logger.warning.call_args[0][0] + finally: + temp_path.unlink() + + def test_comprehensive_partition_selection( + self, comprehensive_partition_config, temp_yaml_file, mock_job, mock_logger + ): + """Test partition selection with comprehensive limits.""" + temp_path = temp_yaml_file(comprehensive_partition_config) + + try: + partitions = read_partition_file(temp_path) + job = mock_job( + threads=8, + mem_mb=64000, + runtime=1200, + gpu=2, + gpu_model="a100", + constraint="intel,avx2", + ) + + selected_partition = get_best_partition(partitions, job, mock_logger) + + # Should select the comprehensive partition + assert selected_partition == "comprehensive" + mock_logger.warning.assert_called_once() + assert ( + "Auto-selected partition 'comprehensive'" + in mock_logger.warning.call_args[0][0] + ) + finally: + temp_path.unlink() + + def test_constraint_mismatch( + self, comprehensive_partition_config, temp_yaml_file, mock_job, mock_logger + ): + """Test job with constraints not available in partition.""" + temp_path = temp_yaml_file(comprehensive_partition_config) + + try: + partitions = read_partition_file(temp_path) + # Job requires constraint not available in partition + job = mock_job(threads=2, constraint="amd,gpu_direct") + + selected_partition = get_best_partition(partitions, job, mock_logger) + + # Should return None due to constraint mismatch + assert selected_partition is None + mock_logger.warning.assert_called_once() + assert "No suitable partition found" in mock_logger.warning.call_args[0][0] + finally: + temp_path.unlink() + + def test_mpi_job_selection( + self, basic_partition_config, temp_yaml_file, mock_job, mock_logger + ): + """Test MPI job partition selection.""" + temp_path = temp_yaml_file(basic_partition_config) + + try: + partitions = read_partition_file(temp_path) + job = mock_job(threads=1, mpi=True, tasks=16) + + selected_partition = get_best_partition(partitions, job, mock_logger) + + # Should select 'default' partition as it supports MPI, 'gpu' doesn't + assert selected_partition == "default" + mock_logger.warning.assert_called_once() + assert ( + "Auto-selected partition 'default'" + in mock_logger.warning.call_args[0][0] + ) + finally: + temp_path.unlink() + + def test_gpu_model_mismatch( + self, basic_partition_config, temp_yaml_file, mock_job, mock_logger + ): + """Test GPU job with unsupported GPU model.""" + temp_path = temp_yaml_file(basic_partition_config) + + try: + partitions = read_partition_file(temp_path) + # Request GPU model not available in any partition + job = mock_job(threads=2, gpu=1, gpu_model="rtx4090") + + selected_partition = get_best_partition(partitions, job, mock_logger) + + # Should return None due to GPU model mismatch + assert selected_partition is None + mock_logger.warning.assert_called_once() + assert "No suitable partition found" in mock_logger.warning.call_args[0][0] + finally: + temp_path.unlink() + + def test_empty_partitions_list( + self, empty_partitions_config, temp_yaml_file, mock_job, mock_logger + ): + """Test partition selection with empty partitions list.""" + temp_path = temp_yaml_file(empty_partitions_config) + + try: + partitions = read_partition_file(temp_path) + job = mock_job(threads=1, mem_mb=1000) + + selected_partition = get_best_partition(partitions, job, mock_logger) + + # Should return None when no partitions available + assert selected_partition is None + mock_logger.warning.assert_called_once() + assert "No suitable partition found" in mock_logger.warning.call_args[0][0] + finally: + temp_path.unlink() + + def test_gres_gpu_specification( + self, basic_partition_config, temp_yaml_file, mock_job, mock_logger + ): + """Test GPU job specified via gres parameter.""" + temp_path = temp_yaml_file(basic_partition_config) + + try: + partitions = read_partition_file(temp_path) + job = mock_job(threads=2, gres="gpu:v100:1", runtime=400) + + selected_partition = get_best_partition(partitions, job, mock_logger) + + # Should select 'gpu' partition as it supports v100 GPUs + assert selected_partition == "gpu" + mock_logger.warning.assert_called_once() + assert ( + "Auto-selected partition 'gpu'" in mock_logger.warning.call_args[0][0] + ) + finally: + temp_path.unlink() + + def test_cpus_per_task_specification( + self, comprehensive_partition_config, temp_yaml_file, mock_job, mock_logger + ): + """Test job with cpus_per_task specification.""" + temp_path = temp_yaml_file(comprehensive_partition_config) + + try: + partitions = read_partition_file(temp_path) + job = mock_job(threads=1, cpus_per_task=32, mem_mb=64000) + + selected_partition = get_best_partition(partitions, job, mock_logger) + + # Should select comprehensive partition as it can handle 32 cpus per task + assert selected_partition == "comprehensive" + mock_logger.warning.assert_called_once() + assert ( + "Auto-selected partition 'comprehensive'" + in mock_logger.warning.call_args[0][0] + ) + finally: + temp_path.unlink() + + def test_runtime_exceeds_limit( + self, basic_partition_config, temp_yaml_file, mock_job, mock_logger + ): + """Test job with runtime exceeding partition limits.""" + temp_path = temp_yaml_file(basic_partition_config) + + try: + partitions = read_partition_file(temp_path) + # Job requires more runtime than gpu partition allows (720 min max) + job = mock_job(threads=1, runtime=1000, gpu=1, gpu_model="a100") + + selected_partition = get_best_partition(partitions, job, mock_logger) + + # Should return None as no partition can accommodate the runtime + assert selected_partition is None + mock_logger.warning.assert_called_once() + assert "No suitable partition found" in mock_logger.warning.call_args[0][0] + finally: + temp_path.unlink() + + class TestSlurmExtraValidation: """Test cases for the validate_slurm_extra function."""