Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,15 @@ The main contents of this release are the introduction of the project sharing an
* Modify `GET /auth/current-user/allowed-viewer-paths/` logic, with `include_shared_projects` query parameter (\#3031).
* Add validator for paths to forbid parent-directory references (\#3031).
* Review job-submission endpoint (\#3041).
* Prevent submissions if `Resource.prevent_new_submissions` is set (\#3042).
* App:
* Add `SlowResponseMiddleware` middleware (\#3035, \#3038).
* Settings:
* Add `Settings.FRACTAL_LONG_REQUEST_TIME` configuration variable (\#3035).
* Runner:
* Prevent job-execution from continuing if `Resource.prevent_new_submissions` is set (\#3042).
* Database:
* Add `Resource.prevent_new_submissions` boolean flag (\#3042).
* Add project-sharing-related `LinkUserProjectV2` columns (\#2999).
* Move `UserOAuth.project_dir` to `.project_dirs` and drop `UserGrop.viewer_paths` (\#3031).
* Enforce max one submitted `JobV2` per `DatasetV2` (\#3044).
Expand Down
13 changes: 13 additions & 0 deletions fractal_server/app/models/v2/resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from sqlalchemy import Column
from sqlalchemy.dialects.postgresql import JSONB
from sqlalchemy.types import DateTime
from sqlmodel import BOOLEAN
from sqlmodel import CheckConstraint
from sqlmodel import Field
from sqlmodel import SQLModel
Expand Down Expand Up @@ -43,6 +44,18 @@ class Resource(SQLModel, table=True):
Address for ssh connections, when `type="slurm_ssh"`.
"""

prevent_new_submissions: bool = Field(
sa_column=Column(
BOOLEAN,
server_default="false",
nullable=False,
),
)
"""
When set to true: Prevent new job submissions and stop execution of
ongoing jobs as soon as the current task is complete.
"""

jobs_local_dir: str
"""
Base local folder for job subfolders (containing artifacts and logs).
Expand Down
8 changes: 8 additions & 0 deletions fractal_server/app/routes/api/v2/submit.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,14 @@ async def submit_job(
user=user,
db=db,
)
if resource.prevent_new_submissions:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_CONTENT,
detail=(
f"The '{resource.name}' resource does not currently accept "
"new job submissions."
),
)

# User appropriate FractalSSH object
if resource.type == ResourceType.SLURM_SSH:
Expand Down
17 changes: 13 additions & 4 deletions fractal_server/app/schemas/v2/resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ class ValidResourceBase(BaseModel):
jobs_runner_config: dict[NonEmptyStr, Any]
jobs_poll_interval: int = 5

prevent_new_submissions: bool = False

@model_validator(mode="after")
def _pixi_slurm_config(self) -> Self:
if (
Expand All @@ -95,6 +97,9 @@ class ValidResourceLocal(ValidResourceBase):
Attributes:
name: Resource name.
type: Resource type.
prevent_new_submissions:
When set to true: Prevent new job submissions and stop execution of
ongoing jobs as soon as the current task is complete.
tasks_python_config:
Configuration of Python interpreters used for task collection.
tasks_pixi_config:
Expand All @@ -105,7 +110,6 @@ class ValidResourceLocal(ValidResourceBase):
Local base folder for job folders.
jobs_runner_config:
Runner configuration.

"""

type: Literal[ResourceType.LOCAL]
Expand All @@ -121,6 +125,9 @@ class ValidResourceSlurmSudo(ValidResourceBase):
Attributes:
name: Resource name.
type: Resource type.
prevent_new_submissions:
When set to true: Prevent new job submissions and stop execution of
ongoing jobs as soon as the current task is complete.
tasks_python_config:
Configuration of Python interpreters used for task collection.
tasks_pixi_config:
Expand Down Expand Up @@ -150,6 +157,9 @@ class ValidResourceSlurmSSH(ValidResourceBase):
Attributes:
name: Resource name
type: Resource type.
prevent_new_submissions:
When set to true: Prevent new job submissions and stop execution of
ongoing jobs as soon as the current task is complete.
tasks_python_config:
Configuration of Python interpreters used for task collection.
tasks_pixi_config:
Expand Down Expand Up @@ -198,10 +208,9 @@ class ResourceRead(BaseModel):
"""

id: int

type: str

name: str
type: str
prevent_new_submissions: bool
timestamp_created: AwareDatetime

host: str | None
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
"""add_prevent_new_submissions

Revision ID: 88270f589c9b
Revises: f0702066b007
Create Date: 2025-12-02 12:34:11.028259

"""

import sqlalchemy as sa
from alembic import op

# revision identifiers, used by Alembic.
revision = "88270f589c9b"
down_revision = "f0702066b007"
branch_labels = None
depends_on = None


def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
with op.batch_alter_table("resource", schema=None) as batch_op:
batch_op.add_column(
sa.Column(
"prevent_new_submissions",
sa.BOOLEAN(),
server_default="false",
nullable=False,
)
)

# ### end Alembic commands ###


def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
with op.batch_alter_table("resource", schema=None) as batch_op:
batch_op.drop_column("prevent_new_submissions")

# ### end Alembic commands ###
1 change: 1 addition & 0 deletions fractal_server/runner/v2/_local.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,4 +104,5 @@ def process_workflow(
job_attribute_filters=job_attribute_filters,
job_type_filters=job_type_filters,
user_id=user_id,
resource_id=resource.id,
)
1 change: 1 addition & 0 deletions fractal_server/runner/v2/_slurm_ssh.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,4 +127,5 @@ def process_workflow(
job_attribute_filters=job_attribute_filters,
job_type_filters=job_type_filters,
user_id=user_id,
resource_id=resource.id,
)
1 change: 1 addition & 0 deletions fractal_server/runner/v2/_slurm_sudo.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,4 +123,5 @@ def process_workflow(
job_attribute_filters=job_attribute_filters,
job_type_filters=job_type_filters,
user_id=user_id,
resource_id=resource.id,
)
29 changes: 24 additions & 5 deletions fractal_server/runner/v2/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from fractal_server.app.models.v2 import HistoryRun
from fractal_server.app.models.v2 import HistoryUnit
from fractal_server.app.models.v2 import JobV2
from fractal_server.app.models.v2 import Resource
from fractal_server.app.models.v2 import TaskGroupV2
from fractal_server.app.models.v2 import WorkflowTaskV2
from fractal_server.app.schemas.v2 import HistoryUnitStatus
Expand Down Expand Up @@ -95,6 +96,7 @@ def execute_tasks(
get_runner_config: GetRunnerConfigType,
job_type_filters: dict[str, bool],
job_attribute_filters: AttributeFilters,
resource_id: int,
) -> None:
logger = get_logger(logger_name=logger_name)

Expand Down Expand Up @@ -211,13 +213,30 @@ def execute_tasks(
f"attribute_filters={job_attribute_filters})."
)
logger.info(error_msg)
update_status_of_history_run(
history_run_id=history_run_id,
status=HistoryUnitStatus.FAILED,
db_sync=db,
)
with next(get_sync_db()) as db:
update_status_of_history_run(
history_run_id=history_run_id,
status=HistoryUnitStatus.FAILED,
db_sync=db,
)
raise JobExecutionError(error_msg)

# Fail if the resource is not open for new submissions
with next(get_sync_db()) as db:
resource = db.get(Resource, resource_id)
if resource.prevent_new_submissions:
error_msg = (
f"Cannot run '{task.name}', since the '{resource.name}' "
"resource is not currently active."
)
logger.info(error_msg)
update_status_of_history_run(
history_run_id=history_run_id,
status=HistoryUnitStatus.FAILED,
db_sync=db,
)
raise JobExecutionError(error_msg)

# TASK EXECUTION
try:
if task.type in [
Expand Down
2 changes: 2 additions & 0 deletions tests/v2/test_03_api/admin/test_admin_resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,11 +151,13 @@ async def test_resource_api(
# PUT one resource / success
NEW_NAME = "A new name"
valid_new_resource["name"] = NEW_NAME
valid_new_resource["prevent_new_submissions"] = True
res = await client.put(
f"/admin/v2/resource/{resource_id}/", json=valid_new_resource
)
assert res.status_code == 200
assert res.json()["name"] == NEW_NAME
assert res.json()["prevent_new_submissions"] is True

# DELETE one resource / failure
res = await client.post(
Expand Down
25 changes: 21 additions & 4 deletions tests/v2/test_03_api/test_api_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,12 @@ async def test_submit_job_failures(
local_resource_profile_db,
slurm_sudo_resource_profile_db,
):
res, prof = local_resource_profile_db
res2, _ = slurm_sudo_resource_profile_db
resource1, profile1 = local_resource_profile_db
resource2, _ = slurm_sudo_resource_profile_db
async with MockCurrentUser(
user_kwargs=dict(
is_verified=True,
profile_id=prof.id,
profile_id=profile1.id,
)
) as user:
task = await task_factory(user_id=user.id)
Expand All @@ -72,7 +72,7 @@ async def test_submit_job_failures(
project2 = await project_factory(user)
workflow2 = await workflow_factory(project_id=project2.id)
# 3
project3 = await project_factory(user, resource_id=res2.id)
project3 = await project_factory(user, resource_id=resource2.id)
dataset3 = await dataset_factory(
project_id=project3.id, name="dataset3"
)
Expand Down Expand Up @@ -130,6 +130,23 @@ async def test_submit_job_failures(
"Project resource does not match with user's resource"
)

# (F) Resource cannot accept new submissions
resource1.prevent_new_submissions = True
db.add(resource1)
await db.commit()
await _workflow_insert_task(
workflow_id=workflow1b.id, task_id=task.id, db=db
)
res = await client.post(
f"{PREFIX}/project/{project1.id}/job/submit/"
f"?workflow_id={workflow1b.id}&dataset_id={dataset1.id}",
json={},
)
debug(res.json())
assert res.status_code == 422
assert resource1.name in res.json()["detail"]
assert "new job submissions" in res.json()["detail"]


async def test_submit_job_ssh_connection_failure(
db,
Expand Down
3 changes: 3 additions & 0 deletions tests/v2/test_04_runner/execute_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,14 @@


def execute_tasks_mod(
*,
wf_task_list: list[WorkflowTaskV2],
workflow_dir_local: Path,
user_id: int,
job_id: int,
job_attribute_filters: dict[str, bool] | None = None,
job_type_filters: dict[str, bool] | None = None,
resource_id: int,
**kwargs,
) -> None:
"""
Expand All @@ -27,5 +29,6 @@ def execute_tasks_mod(
job_id=job_id,
user_id=user_id,
get_runner_config=get_local_backend_config,
resource_id=resource_id,
**kwargs,
)
Loading
Loading