Skip to content

Commit 944d861

Browse files
authored
Add unique index for submitted jobs (#3044)
1 parent c2f8fd6 commit 944d861

File tree

11 files changed

+219
-121
lines changed

11 files changed

+219
-121
lines changed

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44

55
> NOTE: This version requires running a data-migration script (`fractalctl update-db-data`).
66
7+
> WARNING: Before upgrading to this version, make sure that no jobs are marked as submitted in the current database tables.
8+
79
The main contents of this release are the introduction of the project sharing and a review of the authorization scheme for [`fractal-data`](https://github.com/fractal-analytics-platform/fractal-data).
810

911
* API:
@@ -24,6 +26,7 @@ The main contents of this release are the introduction of the project sharing an
2426
* Database:
2527
* Add project-sharing-related `LinkUserProjectV2` columns (\#2999).
2628
* Move `UserOAuth.project_dir` to `.project_dirs` and drop `UserGrop.viewer_paths` (\#3031).
29+
* Enforce max one submitted `JobV2` per `DatasetV2` (\#3044).
2730
* Settings:
2831
* Drop `DataSettings` (\#3031).
2932
* Reduce API logging level for some endpoints (\#3010).

fractal_server/app/models/v2/job.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,9 @@
66
from sqlalchemy.dialects.postgresql import JSONB
77
from sqlalchemy.types import DateTime
88
from sqlmodel import Field
9+
from sqlmodel import Index
910
from sqlmodel import SQLModel
11+
from sqlmodel import text
1012

1113
from fractal_server.app.schemas.v2 import JobStatusType
1214
from fractal_server.utils import get_timestamp
@@ -66,3 +68,12 @@ class JobV2(SQLModel, table=True):
6668
type_filters: dict[str, bool] = Field(
6769
sa_column=Column(JSONB, nullable=False, server_default="{}")
6870
)
71+
72+
__table_args__ = (
73+
Index(
74+
"ix_jobv2_one_submitted_job_per_dataset",
75+
"dataset_id",
76+
unique=True,
77+
postgresql_where=text(f"status = '{JobStatusType.SUBMITTED}'"),
78+
),
79+
)

fractal_server/app/routes/api/v2/_aux_functions.py

Lines changed: 0 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77

88
from fastapi import HTTPException
99
from fastapi import status
10-
from sqlalchemy.exc import MultipleResultsFound
1110
from sqlalchemy.orm.attributes import flag_modified
1211
from sqlmodel import select
1312
from sqlmodel.sql.expression import SelectOfScalar
@@ -554,41 +553,6 @@ async def _get_workflowtask_or_404(
554553
return wftask
555554

556555

557-
async def _get_submitted_job_or_none(
558-
*,
559-
dataset_id: int,
560-
workflow_id: int,
561-
db: AsyncSession,
562-
) -> JobV2 | None:
563-
"""
564-
Get the submitted job for given dataset/workflow, if any.
565-
566-
This function also handles the invalid branch where more than one job
567-
is found.
568-
569-
Args:
570-
dataset_id:
571-
workflow_id:
572-
db:
573-
"""
574-
res = await db.execute(
575-
_get_submitted_jobs_statement()
576-
.where(JobV2.dataset_id == dataset_id)
577-
.where(JobV2.workflow_id == workflow_id)
578-
)
579-
try:
580-
return res.scalars().one_or_none()
581-
except MultipleResultsFound as e:
582-
error_msg = (
583-
f"Multiple running jobs found for {dataset_id=} and {workflow_id=}."
584-
)
585-
logger.error(f"{error_msg} Original error: {str(e)}.")
586-
raise HTTPException(
587-
status_code=status.HTTP_422_UNPROCESSABLE_CONTENT,
588-
detail=error_msg,
589-
)
590-
591-
592556
async def _get_user_resource_id(user_id: int, db: AsyncSession) -> int | None:
593557
res = await db.execute(
594558
select(Resource.id)

fractal_server/app/routes/api/v2/history.py

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@
3333
from fractal_server.logger import set_logger
3434

3535
from ._aux_functions import _get_dataset_check_access
36-
from ._aux_functions import _get_submitted_job_or_none
36+
from ._aux_functions import _get_submitted_jobs_statement
3737
from ._aux_functions import _get_workflow_check_access
3838
from ._aux_functions_history import _verify_workflow_and_dataset_access
3939
from ._aux_functions_history import get_history_run_or_404
@@ -90,11 +90,13 @@ async def get_workflow_tasks_statuses(
9090
db=db,
9191
)
9292

93-
running_job = await _get_submitted_job_or_none(
94-
db=db,
95-
dataset_id=dataset_id,
96-
workflow_id=workflow_id,
93+
res = await db.execute(
94+
_get_submitted_jobs_statement()
95+
.where(JobV2.dataset_id == dataset_id)
96+
.where(JobV2.workflow_id == workflow_id)
9797
)
98+
running_job = res.scalars().one_or_none()
99+
98100
if running_job is not None:
99101
running_wftasks = workflow.task_list[
100102
running_job.first_task_index : running_job.last_task_index + 1

fractal_server/app/routes/api/v2/status_legacy.py

Lines changed: 6 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,5 @@
11
from fastapi import APIRouter
22
from fastapi import Depends
3-
from fastapi import HTTPException
4-
from fastapi import status
53

64
from fractal_server.app.db import AsyncSession
75
from fractal_server.app.db import get_async_db
@@ -64,24 +62,12 @@ async def get_workflowtask_status(
6462
# Check whether there exists a submitted job associated to this
6563
# workflow/dataset pair. If it does exist, it will be used later.
6664
# If there are multiple jobs, raise an error.
67-
stm = _get_submitted_jobs_statement()
68-
stm = stm.where(JobV2.dataset_id == dataset_id)
69-
stm = stm.where(JobV2.workflow_id == workflow_id)
70-
res = await db.execute(stm)
71-
running_jobs = res.scalars().all()
72-
if len(running_jobs) == 0:
73-
running_job = None
74-
elif len(running_jobs) == 1:
75-
running_job = running_jobs[0]
76-
else:
77-
string_ids = str([job.id for job in running_jobs])[1:-1]
78-
raise HTTPException(
79-
status_code=status.HTTP_422_UNPROCESSABLE_CONTENT,
80-
detail=(
81-
f"Cannot get WorkflowTaskV2 statuses as DatasetV2 {dataset.id}"
82-
f" is linked to multiple active jobs: {string_ids}."
83-
),
84-
)
65+
res = await db.execute(
66+
_get_submitted_jobs_statement()
67+
.where(JobV2.dataset_id == dataset_id)
68+
.where(JobV2.workflow_id == workflow_id)
69+
)
70+
running_job = res.scalars().one_or_none()
8571

8672
# Initialize empty dictionary for WorkflowTaskV2 status
8773
workflow_tasks_status_dict: dict = {}
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
"""One submitted Job per Dataset
2+
3+
Revision ID: f0702066b007
4+
Revises: 7910eed4cf97
5+
Create Date: 2025-12-01 20:54:03.137093
6+
7+
"""
8+
9+
import sqlalchemy as sa
10+
from alembic import op
11+
12+
# revision identifiers, used by Alembic.
13+
revision = "f0702066b007"
14+
down_revision = "7910eed4cf97"
15+
branch_labels = None
16+
depends_on = None
17+
18+
19+
def upgrade() -> None:
20+
# ### commands auto generated by Alembic - please adjust! ###
21+
with op.batch_alter_table("jobv2", schema=None) as batch_op:
22+
batch_op.create_index(
23+
"ix_jobv2_one_submitted_job_per_dataset",
24+
["dataset_id"],
25+
unique=True,
26+
postgresql_where=sa.text("status = 'submitted'"),
27+
)
28+
29+
# ### end Alembic commands ###
30+
31+
32+
def downgrade() -> None:
33+
# ### commands auto generated by Alembic - please adjust! ###
34+
with op.batch_alter_table("jobv2", schema=None) as batch_op:
35+
batch_op.drop_index(
36+
"ix_jobv2_one_submitted_job_per_dataset",
37+
postgresql_where=sa.text("status = 'submitted'"),
38+
)
39+
40+
# ### end Alembic commands ###
Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
import pytest
2+
from sqlalchemy.exc import IntegrityError
3+
from sqlalchemy.orm.attributes import flag_modified
4+
5+
from fractal_server.app.models.v2 import DatasetV2
6+
from fractal_server.app.models.v2 import JobV2
7+
from fractal_server.app.models.v2 import ProjectV2
8+
from fractal_server.app.models.v2 import WorkflowV2
9+
from fractal_server.app.schemas.v2.job import JobStatusType
10+
11+
12+
async def test_unique_job_submitted_per_dataset(db, local_resource_profile_db):
13+
resource, _ = local_resource_profile_db
14+
15+
project = ProjectV2(name="Project", resource_id=resource.id)
16+
db.add(project)
17+
await db.commit()
18+
await db.refresh(project)
19+
20+
workflow = WorkflowV2(name="Workflow", project_id=project.id)
21+
dataset1 = DatasetV2(
22+
name="Dataset1", project_id=project.id, zarr_dir="/fake"
23+
)
24+
dataset2 = DatasetV2(
25+
name="Dataset2", project_id=project.id, zarr_dir="/fake"
26+
)
27+
db.add_all([workflow, dataset1, dataset2])
28+
await db.commit()
29+
await db.refresh(workflow)
30+
await db.refresh(dataset1)
31+
await db.refresh(dataset2)
32+
33+
dataset1_id = dataset1.id
34+
dataset2_id = dataset2.id
35+
36+
common_args = dict(
37+
project_id=project.id,
38+
workflow_id=workflow.id,
39+
user_email="user@example.org",
40+
dataset_dump={},
41+
workflow_dump={},
42+
project_dump={},
43+
first_task_index=0,
44+
last_task_index=0,
45+
attribute_filters={},
46+
type_filters={},
47+
)
48+
49+
# Dataset 1, SUBMITTED -> OK
50+
db.add(
51+
JobV2(
52+
dataset_id=dataset1_id,
53+
status=JobStatusType.SUBMITTED,
54+
**common_args,
55+
)
56+
)
57+
await db.commit()
58+
59+
# Dataset 1, NON SUBMITTED -> OK
60+
db.add(
61+
JobV2(
62+
dataset_id=dataset1_id,
63+
status=JobStatusType.FAILED,
64+
**common_args,
65+
)
66+
)
67+
await db.commit()
68+
69+
# Dataset 1, SUBMITTED -> FAIL
70+
db.add(
71+
JobV2(
72+
dataset_id=dataset1_id,
73+
status=JobStatusType.SUBMITTED,
74+
**common_args,
75+
)
76+
)
77+
with pytest.raises(IntegrityError) as e:
78+
await db.commit()
79+
assert "ix_jobv2_one_submitted_job_per_dataset" in e.value.args[0]
80+
await db.rollback()
81+
82+
# Dataset 2, SUBMITTED -> OK
83+
db.add(
84+
JobV2(
85+
dataset_id=dataset2_id,
86+
status=JobStatusType.SUBMITTED,
87+
**common_args,
88+
)
89+
)
90+
await db.commit()
91+
92+
# NOTE: the following tests a situation that should never happen,
93+
# i.e. dataset_id=None, status="submitted"
94+
95+
# Dataset NULL, SUBMITTED -> OK
96+
db.add(
97+
JobV2(
98+
dataset_id=None,
99+
status=JobStatusType.SUBMITTED,
100+
**common_args,
101+
)
102+
)
103+
await db.commit()
104+
105+
# Dataset NULL, SUBMITTED -> OK
106+
db.add(
107+
JobV2(
108+
dataset_id=None,
109+
status=JobStatusType.SUBMITTED,
110+
**common_args,
111+
)
112+
)
113+
await db.commit()
114+
115+
# Dataset 2, test PATCH
116+
job_to_patch = JobV2(
117+
dataset_id=dataset2_id,
118+
status=JobStatusType.FAILED,
119+
**common_args,
120+
)
121+
db.add(job_to_patch)
122+
await db.commit()
123+
124+
# PATCH status to DONE -> OK
125+
job_to_patch.status = JobStatusType.DONE
126+
flag_modified(job_to_patch, "status")
127+
await db.commit()
128+
await db.refresh(job_to_patch)
129+
assert job_to_patch.status == JobStatusType.DONE
130+
131+
# PATCH status to SUBMITTED -> FAIL
132+
job_to_patch.status = JobStatusType.SUBMITTED
133+
flag_modified(job_to_patch, "status")
134+
with pytest.raises(IntegrityError) as e:
135+
await db.commit()
136+
assert "ix_jobv2_one_submitted_job_per_dataset" in e.value.args[0]
137+
await db.rollback()

tests/v2/test_03_api/admin/test_admin_job.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ async def test_view_job(
6666
project_id=project.id,
6767
log="log-b",
6868
dataset_id=dataset.id,
69+
status=JobStatusType.DONE,
6970
workflow_id=workflow2.id,
7071
start_timestamp=datetime(2023, 1, 1, tzinfo=timezone.utc),
7172
end_timestamp=datetime(2023, 11, 9, tzinfo=timezone.utc),
@@ -134,7 +135,10 @@ async def test_view_job(
134135
assert len(res.json()["items"]) == 0
135136
res = await client.get(f"{PREFIX}/job/?status=submitted")
136137
assert res.status_code == 200
137-
assert len(res.json()["items"]) == 2
138+
assert len(res.json()["items"]) == 1
139+
res = await client.get(f"{PREFIX}/job/?status=done")
140+
assert res.status_code == 200
141+
assert len(res.json()["items"]) == 1
138142

139143
# get jobs by [start/end]_timestamp_[min/max]
140144

0 commit comments

Comments
 (0)