Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@ class LocalTaskHandler extends TaskHandler implements FusionAwareTask {
*/
if( elapsedTimeMillis() > wallTimeMillis ) {
destroy()
task.exitStatus = process.exitValue()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would not make sense to use a predictable exit status (we event is known). Otherwise it can entirely depend on "destroy" implementation, and I'm not even sure that's going to a deterministic value.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's an interesting question. I think you could argue either way -- to make it consistent at Nextflow level, or make it consistent with the underlying OS, which the JVM implementation is likely to imitate

I figure that if a user has a pipeline with a mix of e.g. local and SLURM jobs, they would prefer to use the same error codes for both. That's why I lean towards deferring to the JVM

When I tested locally, it returned 143 which corresponds to SIGTERM. This is probably what most implementations do. Some might use 137 (SIGKILL) but that seems less likely since it's more aggressive

task.stdout = outputFile
task.stderr = errorFile
task.error = new ProcessException("Process exceeded running time limit (${task.config.getTime()})")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,13 @@ import java.nio.file.Path

import nextflow.Global
import nextflow.container.DockerConfig
import nextflow.exception.ProcessException
import nextflow.file.http.XPath
import nextflow.processor.TaskBean
import nextflow.processor.TaskConfig
import nextflow.processor.TaskRun
import nextflow.processor.TaskStatus
import nextflow.util.Duration
import spock.lang.Specification
/**
*
Expand Down Expand Up @@ -83,4 +86,33 @@ class LocalTaskHandlerTest extends Specification {
cleanup:
builder?.redirectOutput()?.file()?.delete()
}

def 'should kill task when task exceeds time limit' () {
given:
def workDir = Path.of('/tmp/test-work-dir')
def task = Mock(TaskRun) {
getWorkDir() >> workDir
getConfig() >> Mock(TaskConfig) {
getTime() >> Duration.of(100)
}
}
and:
def handler = Spy(new LocalTaskHandler(task, Mock(LocalExecutor))) {
buildTaskWrapper() >> {}
elapsedTimeMillis() >> 200
}
handler.@process = Mock(Process) {
exitValue() >> 143 // Typical exit code for SIGTERM
}
handler.status = TaskStatus.RUNNING

when:
def completed = handler.checkIfCompleted()

then:
completed == true
1 * task.setExitStatus(143)
1 * task.setError(_ as ProcessException)
handler.status == TaskStatus.COMPLETED
}
}
Loading