Skip to content

Commit 63c3bed

Browse files
authored
Add prevent_new_submission for resources (#3042)
1 parent 944d861 commit 63c3bed

File tree

15 files changed

+258
-14
lines changed

15 files changed

+258
-14
lines changed

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,15 @@ The main contents of this release are the introduction of the project sharing an
1919
* Modify `GET /auth/current-user/allowed-viewer-paths/` logic, with `include_shared_projects` query parameter (\#3031).
2020
* Add validator for paths to forbid parent-directory references (\#3031).
2121
* Review job-submission endpoint (\#3041).
22+
* Prevent submissions if `Resource.prevent_new_submissions` is set (\#3042).
2223
* App:
2324
* Add `SlowResponseMiddleware` middleware (\#3035, \#3038).
2425
* Settings:
2526
* Add `Settings.FRACTAL_LONG_REQUEST_TIME` configuration variable (\#3035).
27+
* Runner:
28+
* Prevent job-execution from continuing if `Resource.prevent_new_submissions` is set (\#3042).
2629
* Database:
30+
* Add `Resource.prevent_new_submissions` boolean flag (\#3042).
2731
* Add project-sharing-related `LinkUserProjectV2` columns (\#2999).
2832
* Move `UserOAuth.project_dir` to `.project_dirs` and drop `UserGrop.viewer_paths` (\#3031).
2933
* Enforce max one submitted `JobV2` per `DatasetV2` (\#3044).

fractal_server/app/models/v2/resource.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
from sqlalchemy import Column
66
from sqlalchemy.dialects.postgresql import JSONB
77
from sqlalchemy.types import DateTime
8+
from sqlmodel import BOOLEAN
89
from sqlmodel import CheckConstraint
910
from sqlmodel import Field
1011
from sqlmodel import SQLModel
@@ -43,6 +44,18 @@ class Resource(SQLModel, table=True):
4344
Address for ssh connections, when `type="slurm_ssh"`.
4445
"""
4546

47+
prevent_new_submissions: bool = Field(
48+
sa_column=Column(
49+
BOOLEAN,
50+
server_default="false",
51+
nullable=False,
52+
),
53+
)
54+
"""
55+
When set to true: Prevent new job submissions and stop execution of
56+
ongoing jobs as soon as the current task is complete.
57+
"""
58+
4659
jobs_local_dir: str
4760
"""
4861
Base local folder for job subfolders (containing artifacts and logs).

fractal_server/app/routes/api/v2/submit.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,14 @@ async def submit_job(
146146
user=user,
147147
db=db,
148148
)
149+
if resource.prevent_new_submissions:
150+
raise HTTPException(
151+
status_code=status.HTTP_422_UNPROCESSABLE_CONTENT,
152+
detail=(
153+
f"The '{resource.name}' resource does not currently accept "
154+
"new job submissions."
155+
),
156+
)
149157

150158
# User appropriate FractalSSH object
151159
if resource.type == ResourceType.SLURM_SSH:

fractal_server/app/schemas/v2/resource.py

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,8 @@ class ValidResourceBase(BaseModel):
7777
jobs_runner_config: dict[NonEmptyStr, Any]
7878
jobs_poll_interval: int = 5
7979

80+
prevent_new_submissions: bool = False
81+
8082
@model_validator(mode="after")
8183
def _pixi_slurm_config(self) -> Self:
8284
if (
@@ -95,6 +97,9 @@ class ValidResourceLocal(ValidResourceBase):
9597
Attributes:
9698
name: Resource name.
9799
type: Resource type.
100+
prevent_new_submissions:
101+
When set to true: Prevent new job submissions and stop execution of
102+
ongoing jobs as soon as the current task is complete.
98103
tasks_python_config:
99104
Configuration of Python interpreters used for task collection.
100105
tasks_pixi_config:
@@ -105,7 +110,6 @@ class ValidResourceLocal(ValidResourceBase):
105110
Local base folder for job folders.
106111
jobs_runner_config:
107112
Runner configuration.
108-
109113
"""
110114

111115
type: Literal[ResourceType.LOCAL]
@@ -121,6 +125,9 @@ class ValidResourceSlurmSudo(ValidResourceBase):
121125
Attributes:
122126
name: Resource name.
123127
type: Resource type.
128+
prevent_new_submissions:
129+
When set to true: Prevent new job submissions and stop execution of
130+
ongoing jobs as soon as the current task is complete.
124131
tasks_python_config:
125132
Configuration of Python interpreters used for task collection.
126133
tasks_pixi_config:
@@ -150,6 +157,9 @@ class ValidResourceSlurmSSH(ValidResourceBase):
150157
Attributes:
151158
name: Resource name
152159
type: Resource type.
160+
prevent_new_submissions:
161+
When set to true: Prevent new job submissions and stop execution of
162+
ongoing jobs as soon as the current task is complete.
153163
tasks_python_config:
154164
Configuration of Python interpreters used for task collection.
155165
tasks_pixi_config:
@@ -198,10 +208,9 @@ class ResourceRead(BaseModel):
198208
"""
199209

200210
id: int
201-
202-
type: str
203-
204211
name: str
212+
type: str
213+
prevent_new_submissions: bool
205214
timestamp_created: AwareDatetime
206215

207216
host: str | None
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
"""add_prevent_new_submissions
2+
3+
Revision ID: 88270f589c9b
4+
Revises: f0702066b007
5+
Create Date: 2025-12-02 12:34:11.028259
6+
7+
"""
8+
9+
import sqlalchemy as sa
10+
from alembic import op
11+
12+
# revision identifiers, used by Alembic.
13+
revision = "88270f589c9b"
14+
down_revision = "f0702066b007"
15+
branch_labels = None
16+
depends_on = None
17+
18+
19+
def upgrade() -> None:
20+
# ### commands auto generated by Alembic - please adjust! ###
21+
with op.batch_alter_table("resource", schema=None) as batch_op:
22+
batch_op.add_column(
23+
sa.Column(
24+
"prevent_new_submissions",
25+
sa.BOOLEAN(),
26+
server_default="false",
27+
nullable=False,
28+
)
29+
)
30+
31+
# ### end Alembic commands ###
32+
33+
34+
def downgrade() -> None:
35+
# ### commands auto generated by Alembic - please adjust! ###
36+
with op.batch_alter_table("resource", schema=None) as batch_op:
37+
batch_op.drop_column("prevent_new_submissions")
38+
39+
# ### end Alembic commands ###

fractal_server/runner/v2/_local.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,4 +104,5 @@ def process_workflow(
104104
job_attribute_filters=job_attribute_filters,
105105
job_type_filters=job_type_filters,
106106
user_id=user_id,
107+
resource_id=resource.id,
107108
)

fractal_server/runner/v2/_slurm_ssh.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,4 +127,5 @@ def process_workflow(
127127
job_attribute_filters=job_attribute_filters,
128128
job_type_filters=job_type_filters,
129129
user_id=user_id,
130+
resource_id=resource.id,
130131
)

fractal_server/runner/v2/_slurm_sudo.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,4 +123,5 @@ def process_workflow(
123123
job_attribute_filters=job_attribute_filters,
124124
job_type_filters=job_type_filters,
125125
user_id=user_id,
126+
resource_id=resource.id,
126127
)

fractal_server/runner/v2/runner.py

Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
from fractal_server.app.models.v2 import HistoryRun
1515
from fractal_server.app.models.v2 import HistoryUnit
1616
from fractal_server.app.models.v2 import JobV2
17+
from fractal_server.app.models.v2 import Resource
1718
from fractal_server.app.models.v2 import TaskGroupV2
1819
from fractal_server.app.models.v2 import WorkflowTaskV2
1920
from fractal_server.app.schemas.v2 import HistoryUnitStatus
@@ -95,6 +96,7 @@ def execute_tasks(
9596
get_runner_config: GetRunnerConfigType,
9697
job_type_filters: dict[str, bool],
9798
job_attribute_filters: AttributeFilters,
99+
resource_id: int,
98100
) -> None:
99101
logger = get_logger(logger_name=logger_name)
100102

@@ -211,13 +213,30 @@ def execute_tasks(
211213
f"attribute_filters={job_attribute_filters})."
212214
)
213215
logger.info(error_msg)
214-
update_status_of_history_run(
215-
history_run_id=history_run_id,
216-
status=HistoryUnitStatus.FAILED,
217-
db_sync=db,
218-
)
216+
with next(get_sync_db()) as db:
217+
update_status_of_history_run(
218+
history_run_id=history_run_id,
219+
status=HistoryUnitStatus.FAILED,
220+
db_sync=db,
221+
)
219222
raise JobExecutionError(error_msg)
220223

224+
# Fail if the resource is not open for new submissions
225+
with next(get_sync_db()) as db:
226+
resource = db.get(Resource, resource_id)
227+
if resource.prevent_new_submissions:
228+
error_msg = (
229+
f"Cannot run '{task.name}', since the '{resource.name}' "
230+
"resource is not currently active."
231+
)
232+
logger.info(error_msg)
233+
update_status_of_history_run(
234+
history_run_id=history_run_id,
235+
status=HistoryUnitStatus.FAILED,
236+
db_sync=db,
237+
)
238+
raise JobExecutionError(error_msg)
239+
221240
# TASK EXECUTION
222241
try:
223242
if task.type in [

tests/v2/test_03_api/admin/test_admin_resource.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,11 +151,13 @@ async def test_resource_api(
151151
# PUT one resource / success
152152
NEW_NAME = "A new name"
153153
valid_new_resource["name"] = NEW_NAME
154+
valid_new_resource["prevent_new_submissions"] = True
154155
res = await client.put(
155156
f"/admin/v2/resource/{resource_id}/", json=valid_new_resource
156157
)
157158
assert res.status_code == 200
158159
assert res.json()["name"] == NEW_NAME
160+
assert res.json()["prevent_new_submissions"] is True
159161

160162
# DELETE one resource / failure
161163
res = await client.post(

0 commit comments

Comments
 (0)