Skip to content

Commit f5ca910

Browse files
authored
Support placement groups for elastic fleets (#3282)
* Support placement groups for elastic fleets * Test placement group creation * Update run_job() signatures * Fix non-master jobs stuck with waiting master job * Simplify tests * Fix run_jobs signature
1 parent 05847cd commit f5ca910

File tree

16 files changed

+451
-250
lines changed

16 files changed

+451
-250
lines changed

src/dstack/_internal/core/backends/base/compute.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,7 @@ def run_job(
111111
project_ssh_public_key: str,
112112
project_ssh_private_key: str,
113113
volumes: List[Volume],
114+
placement_group: Optional[PlacementGroup],
114115
) -> JobProvisioningData:
115116
"""
116117
Launches a new instance for the job. It should return `JobProvisioningData` ASAP.
@@ -287,6 +288,7 @@ def run_job(
287288
project_ssh_public_key: str,
288289
project_ssh_private_key: str,
289290
volumes: List[Volume],
291+
placement_group: Optional[PlacementGroup],
290292
) -> JobProvisioningData:
291293
"""
292294
The default `run_job()` implementation for all backends that support `create_instance()`.
@@ -303,7 +305,9 @@ def run_job(
303305
)
304306
instance_offer = instance_offer.copy()
305307
self._restrict_instance_offer_az_to_volumes_az(instance_offer, volumes)
306-
return self.create_instance(instance_offer, instance_config, placement_group=None)
308+
return self.create_instance(
309+
instance_offer, instance_config, placement_group=placement_group
310+
)
307311

308312
def _restrict_instance_offer_az_to_volumes_az(
309313
self,
@@ -335,6 +339,7 @@ def run_jobs(
335339
instance_offer: InstanceOfferWithAvailability,
336340
project_ssh_public_key: str,
337341
project_ssh_private_key: str,
342+
placement_group: Optional[PlacementGroup],
338343
) -> ComputeGroupProvisioningData:
339344
pass
340345

src/dstack/_internal/core/backends/kubernetes/compute.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
Resources,
5050
SSHConnectionParams,
5151
)
52+
from dstack._internal.core.models.placement import PlacementGroup
5253
from dstack._internal.core.models.resources import CPUSpec, GPUSpec, Memory
5354
from dstack._internal.core.models.runs import Job, JobProvisioningData, Requirements, Run
5455
from dstack._internal.core.models.volumes import Volume
@@ -131,6 +132,7 @@ def run_job(
131132
project_ssh_public_key: str,
132133
project_ssh_private_key: str,
133134
volumes: list[Volume],
135+
placement_group: Optional[PlacementGroup],
134136
) -> JobProvisioningData:
135137
instance_name = generate_unique_instance_name_for_job(run, job)
136138
assert run.run_spec.ssh_key_pub is not None

src/dstack/_internal/core/backends/local/compute.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ def run_job(
7979
project_ssh_public_key: str,
8080
project_ssh_private_key: str,
8181
volumes: List[Volume],
82+
placement_group: Optional[PlacementGroup],
8283
) -> JobProvisioningData:
8384
return JobProvisioningData(
8485
backend=instance_offer.backend,

src/dstack/_internal/core/backends/runpod/compute.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
InstanceOfferWithAvailability,
3737
SSHKey,
3838
)
39+
from dstack._internal.core.models.placement import PlacementGroup
3940
from dstack._internal.core.models.resources import Memory, Range
4041
from dstack._internal.core.models.runs import Job, JobProvisioningData, Requirements, Run
4142
from dstack._internal.core.models.volumes import Volume, VolumeProvisioningData
@@ -109,6 +110,7 @@ def run_job(
109110
project_ssh_public_key: str,
110111
project_ssh_private_key: str,
111112
volumes: List[Volume],
113+
placement_group: Optional[PlacementGroup],
112114
) -> JobProvisioningData:
113115
assert run.run_spec.ssh_key_pub is not None
114116
instance_config = InstanceConfiguration(
@@ -216,6 +218,7 @@ def run_jobs(
216218
instance_offer: InstanceOfferWithAvailability,
217219
project_ssh_public_key: str,
218220
project_ssh_private_key: str,
221+
placement_group: Optional[PlacementGroup],
219222
) -> ComputeGroupProvisioningData:
220223
master_job_configuration = job_configurations[0]
221224
master_job = master_job_configuration.job

src/dstack/_internal/core/backends/template/compute.py.jinja

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ class {{ backend_name }}Compute(
8383
project_ssh_public_key: str,
8484
project_ssh_private_key: str,
8585
volumes: List[Volume],
86+
placement_group: Optional[PlacementGroup],
8687
) -> JobProvisioningData:
8788
# TODO: Implement if create_instance() is not implemented. Delete otherwise.
8889
raise NotImplementedError()

src/dstack/_internal/core/backends/vastai/compute.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
InstanceOfferWithAvailability,
2121
InstanceRuntime,
2222
)
23+
from dstack._internal.core.models.placement import PlacementGroup
2324
from dstack._internal.core.models.runs import Job, JobProvisioningData, Requirements, Run
2425
from dstack._internal.core.models.volumes import Volume
2526
from dstack._internal.utils.logging import get_logger
@@ -82,6 +83,7 @@ def run_job(
8283
project_ssh_public_key: str,
8384
project_ssh_private_key: str,
8485
volumes: List[Volume],
86+
placement_group: Optional[PlacementGroup],
8587
) -> JobProvisioningData:
8688
instance_name = generate_unique_instance_name_for_job(
8789
run, job, max_length=MAX_INSTANCE_NAME_LEN

0 commit comments

Comments
 (0)