Skip to content

Commit 454a2ae

Browse files
authored
Optimize exit code handling by relying on scheduler status for successful executions (#6484)
Unify exit code handling across all cloud executors (AWS Batch, Azure Batch, Google Batch, K8s). Key changes: - Google Batch: now reads exit code from API instead of .exitcode file, fixing Fusion exit code capture - All executors: fallback to .exitcode file only when API returns null (not when 0) Benefits: correct Fusion error reporting, reduced I/O overhead, lower storage costs, consistent behavior across providers.
1 parent 4eee1b6 commit 454a2ae

File tree

6 files changed

+297
-7
lines changed

6 files changed

+297
-7
lines changed

plugins/nf-amazon/src/main/nextflow/cloud/aws/batch/AwsBatchTaskHandler.groovy

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -303,11 +303,11 @@ class AwsBatchTaskHandler extends TaskHandler implements BatchHandler<String,Job
303303
final job = describeJob(jobId)
304304
final done = job?.status() in [JobStatus.SUCCEEDED, JobStatus.FAILED]
305305
if( done ) {
306-
// take the exit code of the container, if 0 (successful) or missing
306+
// take the exit code of the container, if missing (null)
307307
// take the exit code from the `.exitcode` file create by nextflow
308308
// the rationale of this is that, in case of error, the exit code return
309309
// by the batch API is more reliable.
310-
task.exitStatus = job.container().exitCode() ?: readExitFile()
310+
task.exitStatus = job.container()?.exitCode() != null ? job.container().exitCode() : readExitFile()
311311
// finalize the task
312312
task.stdout = outputFile
313313
if( job?.status() == JobStatus.FAILED || task.exitStatus==Integer.MAX_VALUE ) {
@@ -326,7 +326,7 @@ class AwsBatchTaskHandler extends TaskHandler implements BatchHandler<String,Job
326326
return false
327327
}
328328

329-
private int readExitFile() {
329+
protected int readExitFile() {
330330
try {
331331
exitFile.text as Integer
332332
}

plugins/nf-amazon/src/test/nextflow/cloud/aws/batch/AwsBatchTaskHandlerTest.groovy

Lines changed: 106 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616

1717
package nextflow.cloud.aws.batch
1818

19+
import software.amazon.awssdk.services.batch.model.JobStatus
20+
1921
import java.nio.file.Path
2022
import java.time.Instant
2123

@@ -43,6 +45,7 @@ import nextflow.script.ProcessConfig
4345
import nextflow.util.CacheHelper
4446
import nextflow.util.MemoryUnit
4547
import software.amazon.awssdk.services.batch.BatchClient
48+
import software.amazon.awssdk.services.batch.model.ContainerDetail
4649
import software.amazon.awssdk.services.batch.model.DescribeJobDefinitionsRequest
4750
import software.amazon.awssdk.services.batch.model.DescribeJobDefinitionsResponse
4851
import software.amazon.awssdk.services.batch.model.DescribeJobsRequest
@@ -1134,8 +1137,110 @@ class AwsBatchTaskHandlerTest extends Specification {
11341137
2 | true | false | 2
11351138
and:
11361139
null | true | true | 5 // <-- default to 5
1137-
0 | true | true | 5 // <-- default to 5
1140+
0 | true | true | 5 // <-- default to 5
11381141
1 | true | true | 1
11391142
2 | true | true | 2
11401143
}
1144+
1145+
def 'should check if completed with exit code from scheduler'() {
1146+
given:
1147+
def task = new TaskRun()
1148+
def jobId = 'job-123'
1149+
def handler = Spy(new AwsBatchTaskHandler(task: task, jobId: jobId, status: TaskStatus.RUNNING))
1150+
and:
1151+
1152+
def job = JobDetail.builder().container(ContainerDetail.builder()
1153+
.exitCode(0).build()).status(JobStatus.SUCCEEDED)
1154+
.build()
1155+
1156+
when:
1157+
def result = handler.checkIfCompleted()
1158+
then:
1159+
1 * handler.describeJob('job-123') >> job
1160+
0 * handler.readExitFile() // Should NOT read exit file when scheduler provides exit code
1161+
and:
1162+
result == true
1163+
handler.status == TaskStatus.COMPLETED
1164+
handler.task.exitStatus == 0
1165+
}
1166+
1167+
def 'should check if completed with non-zero exit code from scheduler'() {
1168+
given:
1169+
def task = new TaskRun()
1170+
def executor = Mock(AwsBatchExecutor)
1171+
def jobId = 'job-123'
1172+
def handler = Spy(new AwsBatchTaskHandler(task: task, jobId: jobId, status: TaskStatus.RUNNING, executor: executor))
1173+
and:
1174+
def job = JobDetail.builder().container(ContainerDetail.builder().exitCode(137).build())
1175+
.status(JobStatus.FAILED)
1176+
.statusReason('Task terminated')
1177+
.build()
1178+
1179+
when:
1180+
def result = handler.checkIfCompleted()
1181+
then:
1182+
1183+
1 * handler.describeJob(jobId) >> job
1184+
0 * handler.readExitFile() // Should NOT read exit file when scheduler provides exit code
1185+
1 * executor.getJobOutputStream('job-123') >> null
1186+
and:
1187+
result == true
1188+
handler.status == TaskStatus.COMPLETED
1189+
handler.task.exitStatus == 137
1190+
1191+
}
1192+
1193+
def 'should check if completed and fallback to exit file when scheduler exit code is null'() {
1194+
given:
1195+
def task = new TaskRun()
1196+
task.name = 'hello'
1197+
def jobId = 'job-123'
1198+
def handler = Spy(new AwsBatchTaskHandler(task: task, jobId: jobId, status: TaskStatus.RUNNING))
1199+
and:
1200+
1201+
def job = JobDetail.builder().container(ContainerDetail.builder().build())
1202+
.status(JobStatus.SUCCEEDED)
1203+
.build()
1204+
1205+
1206+
when:
1207+
def result = handler.checkIfCompleted()
1208+
then:
1209+
1 * handler.describeJob('job-123') >> job
1210+
1 * handler.readExitFile() >> 0 // Should read exit file as fallback
1211+
and:
1212+
result == true
1213+
handler.status == TaskStatus.COMPLETED
1214+
handler.task.exitStatus == 0
1215+
1216+
}
1217+
1218+
def 'should check if completed no container exit code neither .exitcode file'() {
1219+
given:
1220+
def task = new TaskRun()
1221+
task.name = 'hello'
1222+
def jobId = 'job-123'
1223+
def executor = Mock(AwsBatchExecutor)
1224+
def handler = Spy(new AwsBatchTaskHandler(task: task, jobId: jobId, status: TaskStatus.RUNNING, executor: executor))
1225+
and:
1226+
1227+
def job = JobDetail.builder().container(ContainerDetail.builder().build())
1228+
.status(JobStatus.SUCCEEDED)
1229+
.statusReason('Unknown termination')
1230+
.build()
1231+
1232+
1233+
when:
1234+
def result = handler.checkIfCompleted()
1235+
then:
1236+
1 * handler.describeJob(jobId) >> job
1237+
1 * handler.readExitFile() >> Integer.MAX_VALUE // Should read exit file as fallback
1238+
1 * executor.getJobOutputStream(jobId) >> null
1239+
and:
1240+
result == true
1241+
handler.status == TaskStatus.COMPLETED
1242+
handler.task.exitStatus == Integer.MAX_VALUE
1243+
handler.task.error.message == 'Unknown termination'
1244+
1245+
}
11411246
}

plugins/nf-azure/src/main/nextflow/cloud/azure/batch/AzBatchTaskHandler.groovy

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
package nextflow.cloud.azure.batch
1717

1818
import nextflow.exception.ProcessException
19+
import nextflow.util.TestOnly
1920

2021
import java.nio.file.Path
2122

@@ -114,7 +115,8 @@ class AzBatchTaskHandler extends TaskHandler implements FusionAwareTask {
114115
if( done ) {
115116
// finalize the task
116117
final info = batchService.getTask(taskKey).executionInfo
117-
task.exitStatus = info?.exitCode ?: readExitFile()
118+
// Try to get exit code from Azure batch API and fallback to .exitcode
119+
task.exitStatus = info?.exitCode != null ? info.exitCode : readExitFile()
118120
task.stdout = outputFile
119121
task.stderr = errorFile
120122
status = TaskStatus.COMPLETED
@@ -205,4 +207,9 @@ class AzBatchTaskHandler extends TaskHandler implements FusionAwareTask {
205207
return machineInfo
206208
}
207209

210+
@TestOnly
211+
protected setTaskKey(AzTaskKey key){
212+
this.taskKey = key
213+
}
214+
208215
}

plugins/nf-azure/src/test/nextflow/cloud/azure/batch/AzBatchTaskHandlerTest.groovy

Lines changed: 159 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,13 @@
11
package nextflow.cloud.azure.batch
22

3+
import com.azure.compute.batch.models.BatchTask
4+
import com.azure.compute.batch.models.BatchTaskExecutionInfo
5+
import com.azure.compute.batch.models.BatchTaskFailureInfo
6+
import com.azure.compute.batch.models.BatchTaskState
7+
import com.azure.compute.batch.models.ErrorCategory
8+
import com.sun.jna.platform.unix.X11
9+
import nextflow.processor.TaskStatus
10+
311
import java.nio.file.Path
412

513
import nextflow.cloud.types.CloudMachineInfo
@@ -99,4 +107,155 @@ class AzBatchTaskHandlerTest extends Specification {
99107
trace.machineInfo.zone == 'west-eu'
100108
trace.machineInfo.priceModel == PriceModel.standard
101109
}
110+
111+
def 'should check if completed with exit code from scheduler'() {
112+
given:
113+
def task = Spy(new TaskRun()){
114+
getContainer() >> 'ubuntu'
115+
}
116+
task.name = 'foo'
117+
task.workDir = Path.of('/tmp/wdir')
118+
def taskKey = new AzTaskKey('pool-123', 'job-456')
119+
def azTask = new BatchTask()
120+
def execInfo = new BatchTaskExecutionInfo(0,0)
121+
execInfo.exitCode = 0
122+
azTask.executionInfo = execInfo
123+
azTask.state = BatchTaskState.COMPLETED
124+
125+
def batchService = Mock(AzBatchService){
126+
getTask(taskKey) >> azTask
127+
}
128+
def executor = Mock(AzBatchExecutor){
129+
getBatchService() >> batchService
130+
}
131+
def handler = Spy(new AzBatchTaskHandler(task, executor)){
132+
deleteTask(_,_) >> null
133+
}
134+
handler.status = TaskStatus.RUNNING
135+
handler.taskKey = taskKey
136+
137+
when:
138+
def result = handler.checkIfCompleted()
139+
then:
140+
0 * handler.readExitFile() // Should NOT read exit file when scheduler provides exit code
141+
and:
142+
result == true
143+
handler.task.exitStatus == 0
144+
handler.status == TaskStatus.COMPLETED
145+
146+
}
147+
148+
def 'should check if completed with non-zero exit code from scheduler'() {
149+
given:
150+
def task = Spy(new TaskRun()){
151+
getContainer() >> 'ubuntu'
152+
}
153+
task.name = 'foo'
154+
task.workDir = Path.of('/tmp/wdir')
155+
def taskKey = new AzTaskKey('pool-123', 'job-456')
156+
def azTask = new BatchTask()
157+
def execInfo = new BatchTaskExecutionInfo(0,0)
158+
execInfo.exitCode = 137
159+
azTask.executionInfo = execInfo
160+
azTask.state = BatchTaskState.COMPLETED
161+
162+
def batchService = Mock(AzBatchService){
163+
getTask(taskKey) >> azTask
164+
}
165+
def executor = Mock(AzBatchExecutor){
166+
getBatchService() >> batchService
167+
}
168+
def handler = Spy(new AzBatchTaskHandler(task, executor)){
169+
deleteTask(_,_) >> null
170+
}
171+
handler.status = TaskStatus.RUNNING
172+
handler.taskKey = taskKey
173+
174+
when:
175+
def result = handler.checkIfCompleted()
176+
then:
177+
0 * handler.readExitFile() // Should NOT read exit file when scheduler provides exit code
178+
and:
179+
result == true
180+
handler.task.exitStatus == 137
181+
handler.status == TaskStatus.COMPLETED
182+
183+
184+
}
185+
186+
def 'should check if completed and fallback to exit file when scheduler exit code is null'() {
187+
given:
188+
def task = Spy(new TaskRun()){
189+
getContainer() >> 'ubuntu'
190+
}
191+
task.name = 'foo'
192+
task.workDir = Path.of('/tmp/wdir')
193+
def taskKey = new AzTaskKey('pool-123', 'job-456')
194+
def azTask = new BatchTask()
195+
def execInfo = new BatchTaskExecutionInfo(0,0)
196+
azTask.executionInfo = execInfo
197+
azTask.state = BatchTaskState.COMPLETED
198+
199+
def batchService = Mock(AzBatchService){
200+
getTask(taskKey) >> azTask
201+
}
202+
def executor = Mock(AzBatchExecutor){
203+
getBatchService() >> batchService
204+
}
205+
def handler = Spy(new AzBatchTaskHandler(task, executor)){
206+
deleteTask(_,_) >> null
207+
}
208+
handler.status = TaskStatus.RUNNING
209+
handler.taskKey = taskKey
210+
211+
when:
212+
def result = handler.checkIfCompleted()
213+
214+
then:
215+
1 * handler.readExitFile() >> 0 // Should read exit file as fallback
216+
and:
217+
result == true
218+
handler.task.exitStatus == 0
219+
handler.status == TaskStatus.COMPLETED
220+
}
221+
222+
def 'should check if completed and no scheduler exit code neither .exitcode file'() {
223+
given:
224+
def task = Spy(new TaskRun()){
225+
getContainer() >> 'ubuntu'
226+
}
227+
task.name = 'foo'
228+
task.workDir = Path.of('/tmp/wdir')
229+
def taskKey = new AzTaskKey('pool-123', 'job-456')
230+
def azTask = new BatchTask()
231+
def execInfo = new BatchTaskExecutionInfo(0,0)
232+
def failureInfo = new BatchTaskFailureInfo(ErrorCategory.USER_ERROR)
233+
failureInfo.message = 'Unknown error'
234+
execInfo.failureInfo = failureInfo
235+
azTask.executionInfo = execInfo
236+
azTask.state = BatchTaskState.COMPLETED
237+
238+
def batchService = Mock(AzBatchService){
239+
getTask(taskKey) >> azTask
240+
}
241+
def executor = Mock(AzBatchExecutor){
242+
getBatchService() >> batchService
243+
}
244+
def handler = Spy(new AzBatchTaskHandler(task, executor)){
245+
deleteTask(_,_) >> null
246+
}
247+
handler.status = TaskStatus.RUNNING
248+
handler.taskKey = taskKey
249+
250+
when:
251+
def result = handler.checkIfCompleted()
252+
253+
then:
254+
1 * handler.readExitFile() >> Integer.MAX_VALUE // Should read exit file as fallback
255+
and:
256+
result == true
257+
handler.task.exitStatus == Integer.MAX_VALUE
258+
handler.status == TaskStatus.COMPLETED
259+
handler.task.error.message == 'Unknown error'
260+
}
102261
}

plugins/nf-k8s/src/main/nextflow/k8s/K8sTaskHandler.groovy

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -430,15 +430,15 @@ class K8sTaskHandler extends TaskHandler implements FusionAwareTask {
430430
}
431431
else {
432432
// finalize the task
433-
// read the exit code from the K8s container terminated state, if 0 (successful) or missing
433+
// read the exit code from the K8s container terminated state, if missing
434434
// take the exit code from the `.exitcode` file created by nextflow
435435
// the rationale is that in case of error (e.g. OOMKilled, pod eviction), the exit code from
436436
// the K8s API is more reliable because the container may terminate before the exit file is written
437437
// See https://github.com/nextflow-io/nextflow/issues/6436
438438
// https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.30/#containerstateterminated-v1-core
439439
log.trace("[k8s] Container Terminated state ${state.terminated}")
440440
final k8sExitCode = (state.terminated as Map)?.exitCode as Integer
441-
task.exitStatus = k8sExitCode ?: readExitFile()
441+
task.exitStatus = k8sExitCode != null ? k8sExitCode : readExitFile()
442442
task.stdout = outputFile
443443
task.stderr = errorFile
444444
}

0 commit comments

Comments
 (0)