From ee02d3aeba3d752a23ebf8fa7b30f870f55ed829 Mon Sep 17 00:00:00 2001 From: Ben Sherman Date: Fri, 14 Nov 2025 12:53:35 -0600 Subject: [PATCH] Move task hashing logic to separate class Signed-off-by: Ben Sherman --- .../nextflow/processor/TaskHasher.groovy | 257 ++++++++++++++++++ .../nextflow/processor/TaskProcessor.groovy | 183 +------------ .../nextflow/processor/TaskHasherTest.groovy | 145 ++++++++++ .../processor/TaskProcessorTest.groovy | 108 -------- .../main/nextflow/lineage/LinObserver.groovy | 5 +- 5 files changed, 406 insertions(+), 292 deletions(-) create mode 100644 modules/nextflow/src/main/groovy/nextflow/processor/TaskHasher.groovy create mode 100644 modules/nextflow/src/test/groovy/nextflow/processor/TaskHasherTest.groovy diff --git a/modules/nextflow/src/main/groovy/nextflow/processor/TaskHasher.groovy b/modules/nextflow/src/main/groovy/nextflow/processor/TaskHasher.groovy new file mode 100644 index 0000000000..4429f7b221 --- /dev/null +++ b/modules/nextflow/src/main/groovy/nextflow/processor/TaskHasher.groovy @@ -0,0 +1,257 @@ +/* + * Copyright 2013-2024, Seqera Labs + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package nextflow.processor + +import java.nio.file.Path + +import com.google.common.hash.HashCode +import groovy.json.JsonOutput +import groovy.transform.CompileStatic +import groovy.transform.Memoized +import groovy.util.logging.Slf4j +import nextflow.Session +import nextflow.exception.UnexpectedException +import nextflow.util.CacheHelper +/** + * Implement task hash computation + * + * @author Paolo Di Tommaso + */ +@Slf4j +@CompileStatic +class TaskHasher { + + private TaskRun task + + private TaskProcessor processor + + private Session session + + public TaskHasher(TaskRun task) { + this.task = task + this.processor = task.processor + this.session = task.processor.session + } + + public HashCode compute() { + + final keys = new ArrayList() + + // add session UUID + keys << session.uniqueId + + // add fully-qualified process name + keys << task.processor.name + + // add source code of `script:` or `exec:` block + // + // - this allows task script to reference directives like `task.cpus` + // without invalidating the cache + // + // - references to local variables, global variables, and `task.ext` + // are included separately + keys << task.source + + // add container fingerprint if present + if( task.isContainerEnabled() ) + keys << task.getContainerFingerprint() + + // add the name and value of each task input + for( final entry : task.inputs ) { + keys.add( entry.key.name ) + keys.add( entry.value ) + } + + // add eval output commands + final outEvals = task.getOutputEvals() + if( outEvals ) { + keys.add("eval_outputs") + keys.add(computeEvalOutputCommands(outEvals)) + } + + // add variables referenced in the task script but not declared as input/output + def vars = getTaskGlobalVars() + if( vars ) { + log.trace "Task: ${task.processor.name} > Adding script vars hash code: ${vars}" + keys.add(vars.entrySet()) + } + + // add bin scripts referenced in the task script + final binEntries = getTaskBinEntries(task.source) + if( binEntries ) { + log.trace "Task: ${task.processor.name} > Adding scripts on project bin path: ${-> binEntries.join('; ')}" + keys.addAll(binEntries) + } + + // add environment modules (`module` directive) + final modules = task.getConfig().getModule() + if( modules ) { + keys.addAll(modules) + } + + // add conda packages (`conda` directive) + final conda = task.getCondaEnv() + if( conda ) { + keys.add(conda) + } + + // add spack packages (`spack` and `arch` directives) + final spack = task.getSpackEnv() + final arch = task.getConfig().getArchitecture() + + if( spack ) { + keys.add(spack) + + if( arch ) { + keys.add(arch) + } + } + + // add stub run marker if enabled + if( session.stubRun && task.config.getStubBlock() ) { + keys.add('stub-run') + } + + // compute task hash + final mode = task.processor.getConfig().getHashMode() + final hash = computeHash(keys, mode) + + // log task hash entries if enabled + if( session.dumpHashes ) { + session.dumpHashes == 'json' + ? dumpHashEntriesJson(task, keys, mode, hash) + : dumpHashEntriesLegacy(task, keys, mode, hash) + } + + return hash + } + + /** + * Compute a deterministic string representation of eval output commands for cache hashing. + * This method creates a consistent hash key based on the semantic names and command values + * of eval outputs, ensuring cache invalidation when eval outputs change. + * + * @param outEvals Map of eval parameter names to their command strings + * @return A concatenated string of "name=command" pairs, sorted for deterministic hashing + */ + protected static String computeEvalOutputCommands(Map outEvals) { + // Assert precondition that outEvals should not be null or empty when this method is called + assert outEvals != null && !outEvals.isEmpty(), "Eval outputs should not be null or empty" + + final result = new StringBuilder() + + // Sort entries by key for deterministic ordering. This ensures that the same set of + // eval outputs always produces the same hash regardless of map iteration order, + // which is critical for cache consistency across different JVM runs. + // Without sorting, HashMap iteration order can vary between executions, leading to + // different cache keys for identical eval output configurations and causing + // unnecessary cache misses and task re-execution + final sortedEntries = outEvals.entrySet().sort { a, b -> a.key.compareTo(b.key) } + + // Build content using for loop to concatenate "name=command" pairs. + // This creates a symmetric pattern with input parameter hashing where both + // the parameter name and its value contribute to the cache key + for( final entry : sortedEntries ) { + // Add newline separator between entries for readability in debug scenarios + if( result.length() > 0 ) { + result.append('\n') + } + // Format: "semantic_name=bash_command" - both name and command value are + // included because changing either should invalidate the task cache + result.append(entry.key).append('=').append(entry.value) + } + + return result.toString() + } + + /** + * Get the mapping of global variables that were referenced by + * the task script, excluding references to `task.ext`. + */ + Map getTaskGlobalVars() { + final result = task.getGlobalVars(task.processor.getOwnerScript().getBinding()) + final directives = getTaskExtensionDirectiveVars() + result.putAll(directives) + return result + } + + protected Map getTaskExtensionDirectiveVars() { + final variableNames = task.getVariableNames() + final result = new HashMap(variableNames.size()) + final taskConfig = task.config + for( final key : variableNames ) { + if( !key.startsWith('task.ext.') ) + continue + final value = taskConfig.eval(key.substring(5)) + result.put(key, value) + } + + return result + } + + /** + * This method scans the task command string looking for invocations of scripts + * defined in the project bin folder. + * + * @param script The task command string + * @return The list of paths of scripts in the project bin folder referenced in the task command + */ + @Memoized + List getTaskBinEntries(String script) { + List result = [] + final tokenizer = new StringTokenizer(script, " \t\n\r\f()[]{};&|<>`") + while( tokenizer.hasMoreTokens() ) { + final token = tokenizer.nextToken() + final path = session.binEntries.get(token) + if( path ) + result.add(path) + } + return result + } + + private String safeTaskName(TaskRun task) { + return task != null ? task.lazyName() : task.processor.name + } + + private HashCode computeHash(List keys, CacheHelper.HashMode mode) { + try { + return CacheHelper.hasher(keys, mode).hash() + } + catch (Throwable e) { + final msg = "Something went wrong while creating task hash for process '${task.processor.name}' -- Offending keys: ${ keys.collect { k -> "\n - type=${k.getClass().getName()} value=$k" } }" + throw new UnexpectedException(msg,e) + } + } + + private void dumpHashEntriesJson(TaskRun task, List entries, CacheHelper.HashMode mode, hash) { + final collector = (item) -> [ + hash: CacheHelper.hasher(item, mode).hash().toString(), + type: item?.getClass()?.getName(), + value: item?.toString() + ] + final json = JsonOutput.toJson(entries.collect(collector)) + log.info "[${safeTaskName(task)}] cache hash: ${hash}; mode: ${mode}; entries: ${JsonOutput.prettyPrint(json)}" + } + + private void dumpHashEntriesLegacy(TaskRun task, List entries, CacheHelper.HashMode mode, hash) { + final buffer = new StringBuilder() + buffer.append("[${safeTaskName(task)}] cache hash: ${hash}; mode: $mode; entries: \n") + for( final entry : entries ) { + buffer.append( " ${CacheHelper.hasher(entry, mode).hash()} [${entry?.getClass()?.getName()}] $entry \n") + } + log.info(buffer.toString()) + } +} diff --git a/modules/nextflow/src/main/groovy/nextflow/processor/TaskProcessor.groovy b/modules/nextflow/src/main/groovy/nextflow/processor/TaskProcessor.groovy index f0434e69c7..4ebfa77807 100644 --- a/modules/nextflow/src/main/groovy/nextflow/processor/TaskProcessor.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/processor/TaskProcessor.groovy @@ -27,7 +27,6 @@ import java.util.regex.Pattern import ch.artecat.grengine.Grengine import com.google.common.hash.HashCode -import groovy.json.JsonOutput import groovy.transform.CompileStatic import groovy.transform.Memoized import groovy.transform.PackageScope @@ -60,7 +59,6 @@ import nextflow.exception.ProcessFailedException import nextflow.exception.ProcessRetryableException import nextflow.exception.ProcessSubmitTimeoutException import nextflow.exception.ProcessUnrecoverableException -import nextflow.exception.UnexpectedException import nextflow.executor.CachedTaskHandler import nextflow.executor.Executor import nextflow.executor.StoredTaskHandler @@ -99,7 +97,6 @@ import nextflow.script.params.v2.ProcessInput import nextflow.script.params.v2.ProcessTupleInput import nextflow.script.types.Types import nextflow.trace.TraceRecord -import nextflow.util.CacheHelper import nextflow.util.Escape import nextflow.util.HashBuilder import nextflow.util.LockManager @@ -670,7 +667,7 @@ class TaskProcessor { // -- download foreign files session.filePorter.transfer(foreignFiles) - def hash = createTaskHashKey(task) + final hash = new TaskHasher(task).compute() checkCachedOrLaunchTask(task, hash, resumable) } @@ -1835,146 +1832,6 @@ class TaskProcessor { task.setInput(param, value) } - final protected HashCode createTaskHashKey(TaskRun task) { - - List keys = [ session.uniqueId, name, task.source ] - - if( task.isContainerEnabled() ) - keys << task.getContainerFingerprint() - - // add all the input name-value pairs to the key generator - for( Map.Entry it : task.inputs ) { - keys.add( it.key.name ) - keys.add( it.value ) - } - - // add eval output commands to the hash for proper cache invalidation (fixes issue #5470) - final outEvals = task.getOutputEvals() - if( outEvals ) { - keys.add("eval_outputs") - keys.add(computeEvalOutputsContent(outEvals)) - } - - // add all variable references in the task script but not declared as input/output - def vars = getTaskGlobalVars(task) - if( vars ) { - log.trace "Task: $name > Adding script vars hash code: ${vars}" - keys.add(vars.entrySet()) - } - - final binEntries = getTaskBinEntries(task.source) - if( binEntries ) { - log.trace "Task: $name > Adding scripts on project bin path: ${-> binEntries.join('; ')}" - keys.addAll(binEntries) - } - - final modules = task.getConfig().getModule() - if( modules ) { - keys.addAll(modules) - } - - final conda = task.getCondaEnv() - if( conda ) { - keys.add(conda) - } - - final spack = task.getSpackEnv() - final arch = task.getConfig().getArchitecture() - - if( spack ) { - keys.add(spack) - - if( arch ) { - keys.add(arch) - } - } - - if( session.stubRun && task.config.getStubBlock() ) { - keys.add('stub-run') - } - - final mode = config.getHashMode() - final hash = computeHash(keys, mode) - if( session.dumpHashes ) { - session.dumpHashes=='json' - ? traceInputsHashesJson(task, keys, mode, hash) - : traceInputsHashes(task, keys, mode, hash) - } - return hash - } - - HashCode computeHash(List keys, CacheHelper.HashMode mode) { - try { - return CacheHelper.hasher(keys, mode).hash() - } - catch (Throwable e) { - final msg = "Something went wrong while creating task '$name' unique id -- Offending keys: ${ keys.collect {"\n - type=${it.getClass().getName()} value=$it"} }" - throw new UnexpectedException(msg,e) - } - } - - - /** - * This method scans the task command string looking for invocations of scripts - * defined in the project bin folder. - * - * @param script The task command string - * @return The list of paths of scripts in the project bin folder referenced in the task command - */ - @Memoized - List getTaskBinEntries(String script) { - List result = [] - def tokenizer = new StringTokenizer(script," \t\n\r\f()[]{};&|<>`") - while( tokenizer.hasMoreTokens() ) { - def token = tokenizer.nextToken() - def path = session.binEntries.get(token) - if( path ) - result.add(path) - } - return result - } - - private void traceInputsHashesJson( TaskRun task, List entries, CacheHelper.HashMode mode, hash ) { - final collector = (item) -> [ - hash: CacheHelper.hasher(item, mode).hash().toString(), - type: item?.getClass()?.getName(), - value: item?.toString() - ] - final json = JsonOutput.toJson(entries.collect(collector)) - log.info "[${safeTaskName(task)}] cache hash: ${hash}; mode: ${mode}; entries: ${JsonOutput.prettyPrint(json)}" - } - - private void traceInputsHashes( TaskRun task, List entries, CacheHelper.HashMode mode, hash ) { - - def buffer = new StringBuilder() - buffer.append("[${safeTaskName(task)}] cache hash: ${hash}; mode: $mode; entries: \n") - for( Object item : entries ) { - buffer.append( " ${CacheHelper.hasher(item, mode).hash()} [${item?.getClass()?.getName()}] $item \n") - } - - log.info(buffer.toString()) - } - - Map getTaskGlobalVars(TaskRun task) { - final result = task.getGlobalVars(ownerScript.binding) - final directives = getTaskExtensionDirectiveVars(task) - result.putAll(directives) - return result - } - - protected Map getTaskExtensionDirectiveVars(TaskRun task) { - final variableNames = task.getVariableNames() - final result = new HashMap(variableNames.size()) - final taskConfig = task.config - for( String key : variableNames ) { - if( !key.startsWith('task.ext.') ) continue - final value = taskConfig.eval(key.substring(5)) - result.put(key, value) - } - - return result - } - /** * Execute the specified task shell script * @@ -2269,42 +2126,4 @@ class TaskProcessor { handleException( error, currentTask.get() ) } } - - /** - * Compute a deterministic string representation of eval output commands for cache hashing. - * This method creates a consistent hash key based on the semantic names and command values - * of eval outputs, ensuring cache invalidation when eval outputs change. - * - * @param outEvals Map of eval parameter names to their command strings - * @return A concatenated string of "name=command" pairs, sorted for deterministic hashing - */ - protected String computeEvalOutputsContent(Map outEvals) { - // Assert precondition that outEvals should not be null or empty when this method is called - assert outEvals != null && !outEvals.isEmpty(), "Eval outputs should not be null or empty" - - final result = new StringBuilder() - - // Sort entries by key for deterministic ordering. This ensures that the same set of - // eval outputs always produces the same hash regardless of map iteration order, - // which is critical for cache consistency across different JVM runs. - // Without sorting, HashMap iteration order can vary between executions, leading to - // different cache keys for identical eval output configurations and causing - // unnecessary cache misses and task re-execution - final sortedEntries = outEvals.entrySet().sort { a, b -> a.key.compareTo(b.key) } - - // Build content using for loop to concatenate "name=command" pairs. - // This creates a symmetric pattern with input parameter hashing where both - // the parameter name and its value contribute to the cache key - for( Map.Entry entry : sortedEntries ) { - // Add newline separator between entries for readability in debug scenarios - if( result.length() > 0 ) { - result.append('\n') - } - // Format: "semantic_name=bash_command" - both name and command value are - // included because changing either should invalidate the task cache - result.append(entry.key).append('=').append(entry.value) - } - - return result.toString() - } } diff --git a/modules/nextflow/src/test/groovy/nextflow/processor/TaskHasherTest.groovy b/modules/nextflow/src/test/groovy/nextflow/processor/TaskHasherTest.groovy new file mode 100644 index 0000000000..85abc8244f --- /dev/null +++ b/modules/nextflow/src/test/groovy/nextflow/processor/TaskHasherTest.groovy @@ -0,0 +1,145 @@ +/* + * Copyright 2013-2024, Seqera Labs + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package nextflow.processor + +import java.nio.file.Path + +import nextflow.Session +import nextflow.script.ProcessConfig +import spock.lang.Specification +/** + * + * @author Paolo Di Tommaso + */ +class TaskHasherTest extends Specification { + + def 'should compute unique task hash' () { + + given: + def session = Mock(Session) { + getUniqueId() >> UUID.fromString('b69b6eeb-b332-4d2c-9957-c291b15f498c') + getBinEntries() >> [:] + } + def processor = Mock(TaskProcessor) { + getName() >> 'hello' + getSession() >> session + getConfig() >> Mock(ProcessConfig) + } + and: + def task = Mock(TaskRun) { + getSource() >> 'hello world' + isContainerEnabled() >> false + getContainer() >> null + getConfig() >> Mock(TaskConfig) + getProcessor() >> processor + } + and: + def hasher = Spy(new TaskHasher(task)) + + when: + def uuid1 = hasher.compute() + def uuid2 = hasher.compute() + then: + hasher.getTaskGlobalVars() >>> [ + [foo:'a', bar:'b'], + [bar:'b', foo:'a'] + ] + and: 'global vars should not affect task hash' + uuid1 == uuid2 + } + + def 'should include referenced bin files in the task hash' () { + + given: + def session = Mock(Session) { + getBinEntries() >> [ + 'foo.sh': Path.of('/some/path/foo.sh'), + 'bar.sh': Path.of('/some/path/bar.sh') + ] + } + def processor = Mock(TaskProcessor) { + getName() >> 'hello' + getSession() >> session + } + def task = Mock(TaskRun) { + getProcessor() >> processor + } + def hasher = new TaskHasher(task) + + when: + def result = hasher.getTaskBinEntries('var=x foo.sh') + then: + result.size() == 1 + result.contains(Path.of('/some/path/foo.sh')) + + when: + result = hasher.getTaskBinEntries('echo $(foo.sh); bar.sh') + then: + result.size() == 2 + result.contains(Path.of('/some/path/foo.sh')) + result.contains(Path.of('/some/path/bar.sh')) + } + + def 'should get task directive vars' () { + given: + def processor = Spy(TaskProcessor) { + getConfig() >> Mock(ProcessConfig) + } + and: + def config = new TaskConfig() + config.cpus = 4 + config.ext.alpha = 'AAAA' + config.ext.delta = { foo } + config.ext.omega = "${-> bar}" + config.context = [foo: 'DDDD', bar: 'OOOO'] + and: + def task = Mock(TaskRun) { + getConfig() >> config + getProcessor() >> processor + getVariableNames() >> { [ 'task.cpus', 'task.ext.alpha', 'task.ext.delta', 'task.ext.omega' ] as Set } + } + + when: + def result = new TaskHasher(task).getTaskExtensionDirectiveVars() + then: + result == [ + 'task.ext.alpha': 'AAAA', + 'task.ext.delta': 'DDDD', + 'task.ext.omega': 'OOOO', + ] + } + + def 'should compute hash entries for eval outputs'() { + + when: + def result1 = TaskHasher.computeEvalOutputCommands([ + 'nxf_out_eval_2': 'echo "value2"', + 'nxf_out_eval_1': 'echo "value1"', + 'nxf_out_eval_3': 'echo "value3"' + ]) + + def result2 = TaskHasher.computeEvalOutputCommands([ + 'nxf_out_eval_3': 'echo "value3"', + 'nxf_out_eval_1': 'echo "value1"', + 'nxf_out_eval_2': 'echo "value2"' + ]) + + then: + result1 == result2 + result1 == 'nxf_out_eval_1=echo "value1"\nnxf_out_eval_2=echo "value2"\nnxf_out_eval_3=echo "value3"' + } +} diff --git a/modules/nextflow/src/test/groovy/nextflow/processor/TaskProcessorTest.groovy b/modules/nextflow/src/test/groovy/nextflow/processor/TaskProcessorTest.groovy index 0176f66b4a..137dd54b89 100644 --- a/modules/nextflow/src/test/groovy/nextflow/processor/TaskProcessorTest.groovy +++ b/modules/nextflow/src/test/groovy/nextflow/processor/TaskProcessorTest.groovy @@ -300,63 +300,6 @@ class TaskProcessorTest extends Specification { } - - def 'should get bin files in the script command' () { - - given: - def session = Mock(Session) - session.getBinEntries() >> ['foo.sh': Paths.get('/some/path/foo.sh'), 'bar.sh': Paths.get('/some/path/bar.sh')] - def processor = [:] as TaskProcessor - processor.session = session - - when: - def result = processor.getTaskBinEntries('var=x foo.sh') - then: - result.size()==1 - result.contains(Paths.get('/some/path/foo.sh')) - - when: - result = processor.getTaskBinEntries('echo $(foo.sh); bar.sh') - then: - result.size()==2 - result.contains(Paths.get('/some/path/foo.sh')) - result.contains(Paths.get('/some/path/bar.sh')) - - } - - def 'should make task unique id' () { - - given: - def session = Mock(Session) { - getUniqueId() >> UUID.fromString('b69b6eeb-b332-4d2c-9957-c291b15f498c') - getBinEntries() >> ['foo.sh': Paths.get('/some/path/foo.sh'), 'bar.sh': Paths.get('/some/path/bar.sh')] - } - and: - def task = Mock(TaskRun) { - getSource() >> 'hello world' - isContainerEnabled() >> false - getContainer() >> null - getConfig() >> Mock(TaskConfig) - } - and: - def processor = Spy(TaskProcessor) - processor.@session = session - processor.@config = Mock(ProcessConfig) - - when: - def uuid1 = processor.createTaskHashKey(task) - def uuid2 = processor.createTaskHashKey(task) - then: - // global var should *not* change task hash - processor.getTaskGlobalVars(task) >>> [ - [foo:'a', bar:'b'], - [bar:'b', foo:'a'] - ] - and: - uuid1 == uuid2 - - } - def 'should export env vars' () { given: @@ -387,34 +330,6 @@ class TaskProcessorTest extends Specification { } - def 'should get task directive vars' () { - given: - def processor = Spy(TaskProcessor) - processor.@config = Mock(ProcessConfig) - and: - def task = Mock(TaskRun) - and: - def config = new TaskConfig() - config.cpus = 4 - config.ext.alpha = 'AAAA' - config.ext.delta = { foo } - config.ext.omega = "${-> bar}" - and: - config.setContext( foo: 'DDDD', bar: 'OOOO' ) - - when: - def result = processor.getTaskExtensionDirectiveVars(task) - then: - 1 * task.getVariableNames() >> {[ 'task.cpus', 'task.ext.alpha', 'task.ext.delta', 'task.ext.omega' ] as Set} - 1 * task.getConfig() >> config - then: - result == [ - 'task.ext.alpha': 'AAAA', - 'task.ext.delta': 'DDDD', - 'task.ext.omega': 'OOOO', - ] - } - def 'should bind fair outputs' () { given: def processor = Spy(TaskProcessor) @@ -703,27 +618,4 @@ class TaskProcessorTest extends Specification { 0 * collector.collect(task) 1 * exec.submit(task) } - - def 'should compute eval outputs content deterministically'() { - - setup: - def processor = createProcessor('test', Mock(Session)) - - when: - def result1 = processor.computeEvalOutputsContent([ - 'nxf_out_eval_2': 'echo "value2"', - 'nxf_out_eval_1': 'echo "value1"', - 'nxf_out_eval_3': 'echo "value3"' - ]) - - def result2 = processor.computeEvalOutputsContent([ - 'nxf_out_eval_3': 'echo "value3"', - 'nxf_out_eval_1': 'echo "value1"', - 'nxf_out_eval_2': 'echo "value2"' - ]) - - then: - result1 == result2 - result1 == 'nxf_out_eval_1=echo "value1"\nnxf_out_eval_2=echo "value2"\nnxf_out_eval_3=echo "value3"' - } } diff --git a/modules/nf-lineage/src/main/nextflow/lineage/LinObserver.groovy b/modules/nf-lineage/src/main/nextflow/lineage/LinObserver.groovy index 2e2b621446..1c43132580 100644 --- a/modules/nf-lineage/src/main/nextflow/lineage/LinObserver.groovy +++ b/modules/nf-lineage/src/main/nextflow/lineage/LinObserver.groovy @@ -38,6 +38,7 @@ import nextflow.lineage.model.v1beta1.WorkflowOutput import nextflow.lineage.model.v1beta1.WorkflowRun import nextflow.file.FileHelper import nextflow.file.FileHolder +import nextflow.processor.TaskHasher import nextflow.processor.TaskRun import nextflow.script.ScriptMeta import nextflow.script.params.BaseParam @@ -257,8 +258,8 @@ class LinObserver implements TraceObserverV2 { normalizer.normalizePath(task.getCondaEnv()), normalizer.normalizePath(task.getSpackEnv()), task.config?.getArchitecture()?.toString(), - task.processor.getTaskGlobalVars(task), - task.processor.getTaskBinEntries(task.source).collect { Path p -> new DataPath( + new TaskHasher(task).getTaskGlobalVars(), + new TaskHasher(task).getTaskBinEntries(task.source).collect { Path p -> new DataPath( normalizer.normalizePath(p.normalize()), Checksum.ofNextflow(p) ) },