Skip to content

Commit c2f8fd6

Browse files
authored
Merge pull request #3041 from fractal-analytics-platform/3039-review-job-submission-endpoint
Review job-submission endpoint
2 parents 2670ca2 + 46b368c commit c2f8fd6

File tree

4 files changed

+272
-198
lines changed

4 files changed

+272
-198
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ The main contents of this release are the introduction of the project sharing an
1616
* Reduce API logging level for some endpoints (\#3010).
1717
* Modify `GET /auth/current-user/allowed-viewer-paths/` logic, with `include_shared_projects` query parameter (\#3031).
1818
* Add validator for paths to forbid parent-directory references (\#3031).
19+
* Review job-submission endpoint (\#3041).
1920
* App:
2021
* Add `SlowResponseMiddleware` middleware (\#3035, \#3038).
2122
* Settings:

fractal_server/app/routes/api/v2/_aux_functions.py

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
"""
44

55
from typing import Any
6-
from typing import Literal
6+
from typing import TypedDict
77

88
from fastapi import HTTPException
99
from fastapi import status
@@ -252,14 +252,19 @@ async def _check_project_exists(
252252
)
253253

254254

255+
class DatasetOrProject(TypedDict):
256+
dataset: DatasetV2
257+
project: ProjectV2
258+
259+
255260
async def _get_dataset_check_access(
256261
*,
257262
project_id: int,
258263
dataset_id: int,
259264
user_id: int,
260265
required_permissions: ProjectPermissions,
261266
db: AsyncSession,
262-
) -> dict[Literal["dataset", "project"], DatasetV2 | ProjectV2]:
267+
) -> DatasetOrProject:
263268
"""
264269
Get a dataset and a project, after access control on the project
265270
@@ -304,14 +309,19 @@ async def _get_dataset_check_access(
304309
return dict(dataset=dataset, project=project)
305310

306311

312+
class JobAndProject(TypedDict):
313+
job: JobV2
314+
project: ProjectV2
315+
316+
307317
async def _get_job_check_access(
308318
*,
309319
project_id: int,
310320
job_id: int,
311321
user_id: int,
312322
required_permissions: ProjectPermissions,
313323
db: AsyncSession,
314-
) -> dict[Literal["job", "project"], JobV2 | ProjectV2]:
324+
) -> JobAndProject:
315325
"""
316326
Get a job and a project, after access control on the project
317327
@@ -454,7 +464,8 @@ async def _workflow_insert_task(
454464

455465

456466
async def clean_app_job_list(
457-
db: AsyncSession, jobs_list: list[int]
467+
db: AsyncSession,
468+
jobs_list: list[int],
458469
) -> list[int]:
459470
"""
460471
Remove from a job list all jobs with status different from submitted.
@@ -466,12 +477,14 @@ async def clean_app_job_list(
466477
Return:
467478
List of IDs for submitted jobs.
468479
"""
480+
logger.info(f"[clean_app_job_list] START - {jobs_list=}.")
469481
stmt = select(JobV2).where(JobV2.id.in_(jobs_list))
470482
result = await db.execute(stmt)
471483
db_jobs_list = result.scalars().all()
472484
submitted_job_ids = [
473485
job.id for job in db_jobs_list if job.status == JobStatusType.SUBMITTED
474486
]
487+
logger.info(f"[clean_app_job_list] END - {submitted_job_ids=}.")
475488
return submitted_job_ids
476489

477490

fractal_server/app/routes/api/v2/submit.py

Lines changed: 48 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
from fastapi import Request
1010
from fastapi import status
1111
from sqlmodel import select
12+
from sqlmodel import update
1213

1314
from fractal_server.app.db import AsyncSession
1415
from fractal_server.app.db import get_async_db
@@ -51,7 +52,7 @@
5152
status_code=status.HTTP_202_ACCEPTED,
5253
response_model=JobRead,
5354
)
54-
async def apply_workflow(
55+
async def submit_job(
5556
project_id: int,
5657
workflow_id: int,
5758
dataset_id: int,
@@ -146,34 +147,6 @@ async def apply_workflow(
146147
db=db,
147148
)
148149

149-
# Check that no other job with the same dataset_id is SUBMITTED
150-
stm = (
151-
select(JobV2)
152-
.where(JobV2.dataset_id == dataset_id)
153-
.where(JobV2.status == JobStatusType.SUBMITTED)
154-
)
155-
res = await db.execute(stm)
156-
if res.scalars().all():
157-
raise HTTPException(
158-
status_code=status.HTTP_422_UNPROCESSABLE_CONTENT,
159-
detail=(
160-
f"Dataset {dataset_id} is already in use in submitted job(s)."
161-
),
162-
)
163-
164-
if job_create.slurm_account is not None:
165-
if job_create.slurm_account not in user.slurm_accounts:
166-
raise HTTPException(
167-
status_code=status.HTTP_422_UNPROCESSABLE_CONTENT,
168-
detail=(
169-
f"SLURM account '{job_create.slurm_account}' is not "
170-
"among those available to the current user"
171-
),
172-
)
173-
else:
174-
if len(user.slurm_accounts) > 0:
175-
job_create.slurm_account = user.slurm_accounts[0]
176-
177150
# User appropriate FractalSSH object
178151
if resource.type == ResourceType.SLURM_SSH:
179152
ssh_config = dict(
@@ -196,6 +169,35 @@ async def apply_workflow(
196169
else:
197170
fractal_ssh = None
198171

172+
# Assign `job_create.slurm_account`
173+
if job_create.slurm_account is not None:
174+
if job_create.slurm_account not in user.slurm_accounts:
175+
raise HTTPException(
176+
status_code=status.HTTP_422_UNPROCESSABLE_CONTENT,
177+
detail=(
178+
f"SLURM account '{job_create.slurm_account}' is not "
179+
"among those available to the current user"
180+
),
181+
)
182+
else:
183+
if len(user.slurm_accounts) > 0:
184+
job_create.slurm_account = user.slurm_accounts[0]
185+
186+
# Check that no other job with the same dataset_id is SUBMITTED
187+
stm = (
188+
select(JobV2)
189+
.where(JobV2.dataset_id == dataset_id)
190+
.where(JobV2.status == JobStatusType.SUBMITTED)
191+
)
192+
res = await db.execute(stm)
193+
if res.scalars().all():
194+
raise HTTPException(
195+
status_code=status.HTTP_422_UNPROCESSABLE_CONTENT,
196+
detail=(
197+
f"Dataset {dataset_id} is already in use in submitted job(s)."
198+
),
199+
)
200+
199201
# Add new Job object to DB
200202
job = JobV2(
201203
project_id=project_id,
@@ -219,38 +221,31 @@ async def apply_workflow(
219221
await db.refresh(job)
220222

221223
# Update TaskGroupV2.timestamp_last_used
222-
res = await db.execute(
223-
select(TaskGroupV2).where(TaskGroupV2.id.in_(used_task_group_ids))
224+
await db.execute(
225+
update(TaskGroupV2)
226+
.where(TaskGroupV2.id.in_(used_task_group_ids))
227+
.values(timestamp_last_used=job.start_timestamp)
224228
)
225-
used_task_groups = res.scalars().all()
226-
for used_task_group in used_task_groups:
227-
used_task_group.timestamp_last_used = job.start_timestamp
228-
db.add(used_task_group)
229229
await db.commit()
230230

231-
# Define server-side job directory
232-
timestamp_string = job.start_timestamp.strftime("%Y%m%d_%H%M%S")
233-
WORKFLOW_DIR_LOCAL = Path(resource.jobs_local_dir) / (
231+
# Define `cache_dir`
232+
cache_dir = Path(user.project_dirs[0], FRACTAL_CACHE_DIR)
233+
234+
# Define server-side and user-side job directories
235+
timestamp_string = job.start_timestamp.strftime(r"%Y%m%d_%H%M%S")
236+
working_dir = Path(resource.jobs_local_dir) / (
234237
f"proj_v2_{project_id:07d}_wf_{workflow_id:07d}_job_{job.id:07d}"
235238
f"_{timestamp_string}"
236239
)
237-
238-
# Define user-side job directory
239-
cache_dir = Path(user.project_dirs[0], FRACTAL_CACHE_DIR)
240240
match resource.type:
241241
case ResourceType.LOCAL:
242-
WORKFLOW_DIR_REMOTE = WORKFLOW_DIR_LOCAL
242+
working_dir_user = working_dir
243243
case ResourceType.SLURM_SUDO:
244-
WORKFLOW_DIR_REMOTE = cache_dir / WORKFLOW_DIR_LOCAL.name
244+
working_dir_user = cache_dir / working_dir.name
245245
case ResourceType.SLURM_SSH:
246-
WORKFLOW_DIR_REMOTE = Path(
247-
profile.jobs_remote_dir,
248-
WORKFLOW_DIR_LOCAL.name,
249-
)
250-
251-
# Update job folders in the db
252-
job.working_dir = WORKFLOW_DIR_LOCAL.as_posix()
253-
job.working_dir_user = WORKFLOW_DIR_REMOTE.as_posix()
246+
working_dir_user = Path(profile.jobs_remote_dir, working_dir.name)
247+
job.working_dir = working_dir.as_posix()
248+
job.working_dir_user = working_dir_user.as_posix()
254249
await db.merge(job)
255250
await db.commit()
256251

@@ -268,9 +263,7 @@ async def apply_workflow(
268263
)
269264
request.app.state.jobs.append(job.id)
270265
logger.info(
271-
f"Current worker's pid is {os.getpid()}. "
272-
f"Current status of worker job's list "
273-
f"{request.app.state.jobs}"
266+
f"Job {job.id}, worker with pid {os.getpid()}. "
267+
f"Worker jobs list: {request.app.state.jobs}."
274268
)
275-
await db.close()
276269
return job

0 commit comments

Comments
 (0)