Skip to content

Commit 07083f2

Browse files
authored
Clear checkpoint (#132)
* Added comments * Started implementing component specific input clear * Finished implementing component specific input clear * Added condition to remove input only after successful execution * Added more date information in `inspect` mode, including the year and the locale of the executing system. * Added changelog entry
1 parent 10633e9 commit 07083f2

27 files changed

+361
-38
lines changed

changelog.md

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,15 @@
22

33
## Changes in upcoming release (`dev` branch)
44

5+
### Features
6+
7+
- Added a new `clearInput` parameter to components that change their input.
8+
The aim of this option is to allow the controlled removal of temporary files,
9+
which is particularly useful in very large workflows.
10+
511
### Components changes
612

7-
Updated images for components `mash_dist`, `mash_screen` and
13+
- Updated images for components `mash_dist`, `mash_screen` and
814
`mapping_patlas`.
915

1016
### New components
@@ -14,6 +20,8 @@ Updated images for components `mash_dist`, `mash_screen` and
1420

1521
- Added `--export-directives` option to `build` mode to export component's
1622
directives in JSON format to standard output.
23+
- Added more date information in `inspect` mode, including the year and the
24+
locale of the executing system.
1725

1826
## 1.3.0
1927

flowcraft/generator/components/assembly.py

Lines changed: 32 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,14 @@ def __init__(self, **kwargs):
5050
"If 'auto' the SPAdes k-mer lengths will be determined "
5151
"from the maximum read length of each assembly. If "
5252
"'default', SPAdes will use the default k-mer lengths. "
53+
},
54+
"clearInput": {
55+
"default": "false",
56+
"description":
57+
"Permanently removes temporary input files. This option "
58+
"is only useful to remove temporary files in large "
59+
"workflows and prevents nextflow's resume functionality. "
60+
"Use with caution."
5361
}
5462
}
5563

@@ -81,6 +89,18 @@ def __init__(self, **kwargs):
8189
"scratch": "true"
8290
}}
8391

92+
self.params = {
93+
"clearInput": {
94+
"default": "false",
95+
"description":
96+
"Permanently removes temporary input files. This option "
97+
"is only useful to remove temporary files in large "
98+
"workflows and prevents nextflow's resume functionality. "
99+
"Use with caution."
100+
}
101+
}
102+
103+
84104
class ViralAssembly(Process):
85105
"""
86106
Process to assemble viral genomes, based on SPAdes and megahit
@@ -95,7 +115,8 @@ def __init__(self, **kwargs):
95115

96116
self.dependencies = ["integrity_coverage"]
97117

98-
self.status_channels = ["va_spades" , "va_megahit", "report_viral_assembly"]
118+
self.status_channels = ["va_spades", "va_megahit",
119+
"report_viral_assembly"]
99120

100121
self.link_end.append({"link": "SIDE_max_len", "alias": "SIDE_max_len"})
101122

@@ -105,7 +126,7 @@ def __init__(self, **kwargs):
105126
"container": "flowcraft/viral_assembly",
106127
"version": "0.1-1",
107128
"scratch": "true"
108-
},"va_megahit": {
129+
}, "va_megahit": {
109130
"cpus": 4,
110131
"memory": "{ 5.GB * task.attempt }",
111132
"container": "flowcraft/viral_assembly",
@@ -146,5 +167,13 @@ def __init__(self, **kwargs):
146167
"from the maximum read length of each assembly. If "
147168
"'default', megahit will use the default k-mer lengths. "
148169
"(default: $params.megahitKmers)"
170+
},
171+
"clearInput": {
172+
"default": "false",
173+
"description":
174+
"Permanently removes temporary input files. This option "
175+
"is only useful to remove temporary files in large "
176+
"workflows and prevents nextflow's resume functionality. "
177+
"Use with caution."
149178
}
150-
}
179+
}

flowcraft/generator/components/assembly_processing.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -204,6 +204,17 @@ def __init__(self, **kwargs):
204204
self.link_end.append({"link": "SIDE_BpCoverage",
205205
"alias": "SIDE_BpCoverage"})
206206

207+
self.params = {
208+
"clearInput": {
209+
"default": "false",
210+
"description":
211+
"Permanently removes temporary input files. This option "
212+
"is only useful to remove temporary files in large "
213+
"workflows and prevents nextflow's resume functionality. "
214+
"Use with caution."
215+
}
216+
}
217+
207218
self.directives = {
208219
"pilon": {
209220
"cpus": 4,

flowcraft/generator/components/mapping.py

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,14 @@ def __init__(self, **kwargs):
3232
"default": "null",
3333
"description": "Specifies the reference indexes to be provided "
3434
"to bowtie2."
35+
},
36+
"clearInput": {
37+
"default": "false",
38+
"description":
39+
"Permanently removes temporary input files. This option "
40+
"is only useful to remove temporary files in large "
41+
"workflows and prevents nextflow's resume functionality. "
42+
"Use with caution."
3543
}
3644
}
3745

@@ -55,8 +63,10 @@ def __init__(self, **kwargs):
5563
"report_bowtie"
5664
]
5765

66+
5867
class Retrieve_mapped(Process):
59-
"""Samtools process to to align short paired-end sequencing reads to long reference sequences
68+
"""Samtools process to to align short paired-end sequencing reads to
69+
long reference sequences
6070
6171
This process is set with:
6272
@@ -74,6 +84,14 @@ def __init__(self, **kwargs):
7484
self.output_type = "fastq"
7585

7686
self.params = {
87+
"clearInput": {
88+
"default": "false",
89+
"description":
90+
"Permanently removes temporary input files. This option "
91+
"is only useful to remove temporary files in large "
92+
"workflows and prevents nextflow's resume functionality. "
93+
"Use with caution."
94+
}
7795
}
7896

7997
self.dependencies = ["bowtie"]

flowcraft/generator/components/metagenomics.py

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
except ImportError:
55
from flowcraft.generator.process import Process
66

7+
78
class Kraken(Process):
89
"""kraken process template interface
910
@@ -80,6 +81,14 @@ def __init__(self, **kwargs):
8081
"default": 0.9,
8182
"description": "probability threshold for EM final classification."
8283
"Default: 0.9"
84+
},
85+
"clearInput": {
86+
"default": "false",
87+
"description":
88+
"Permanently removes temporary input files. This option "
89+
"is only useful to remove temporary files in large "
90+
"workflows and prevents nextflow's resume functionality. "
91+
"Use with caution."
8392
}
8493
}
8594

@@ -129,6 +138,14 @@ def __init__(self, **kwargs):
129138
"from the maximum read length of each assembly. If "
130139
"'default', megahit will use the default k-mer lengths. "
131140
"(default: $params.megahitKmers)"
141+
},
142+
"clearInput": {
143+
"default": "false",
144+
"description":
145+
"Permanently removes temporary input files. This option "
146+
"is only useful to remove temporary files in large "
147+
"workflows and prevents nextflow's resume functionality. "
148+
"Use with caution."
132149
}
133150
}
134151

@@ -173,6 +190,14 @@ def __init__(self, **kwargs):
173190
"from the maximum read length of each assembly. If "
174191
"'default', metaSPAdes will use the default k-mer lengths. "
175192
"(default: $params.metaspadesKmers)"
193+
},
194+
"clearInput": {
195+
"default": "false",
196+
"description":
197+
"Permanently removes temporary input files. This option "
198+
"is only useful to remove temporary files in large "
199+
"workflows and prevents nextflow's resume functionality. "
200+
"Use with caution."
176201
}
177202
}
178203

@@ -245,6 +270,14 @@ def __init__(self, **kwargs):
245270
"default": "'/index_hg19/hg19'",
246271
"description": "Specifies the reference indexes to be provided "
247272
"to bowtie2."
273+
},
274+
"clearInput": {
275+
"default": "false",
276+
"description":
277+
"Permanently removes temporary input files. This option "
278+
"is only useful to remove temporary files in large "
279+
"workflows and prevents nextflow's resume functionality. "
280+
"Use with caution."
248281
}
249282
}
250283

flowcraft/generator/components/reads_quality_control.py

Lines changed: 37 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -215,6 +215,14 @@ def __init__(self, **kwargs):
215215
"default": "55",
216216
"description":
217217
"Drop the read if it is below a specified length "
218+
},
219+
"clearInput": {
220+
"default": "false",
221+
"description":
222+
"Permanently removes temporary input files. This option "
223+
"is only useful to remove temporary files in large "
224+
"workflows and prevents nextflow's resume functionality. "
225+
"Use with caution."
218226
}
219227
}
220228

@@ -292,6 +300,14 @@ def __init__(self, **kwargs):
292300
"default": "55",
293301
"description":
294302
"Drop the read if it is below a specified length."
303+
},
304+
"clearInput": {
305+
"default": "false",
306+
"description":
307+
"Permanently removes temporary input files. This option "
308+
"is only useful to remove temporary files in large "
309+
"workflows and prevents nextflow's resume functionality. "
310+
"Use with caution."
295311
}
296312
}
297313

@@ -334,9 +350,18 @@ def __init__(self, **kwargs):
334350
"default": "'A 50%; T 50%; N 50%'",
335351
"description":
336352
"Pattern to filter the reads. Please separate parameter"
337-
"values with a space and separate new parameter sets with semicolon (;)."
338-
"Parameters are defined by two values: the pattern (any combination of the"
339-
"letters ATCGN), and the number of repeats or percentage of occurence."
353+
"values with a space and separate new parameter sets with"
354+
" semicolon (;). Parameters are defined by two values: "
355+
"the pattern (any combination of the letters ATCGN), and "
356+
"the number of repeats or percentage of occurence."
357+
},
358+
"clearInput": {
359+
"default": "false",
360+
"description":
361+
"Permanently removes temporary input files. This option "
362+
"is only useful to remove temporary files in large "
363+
"workflows and prevents nextflow's resume functionality. "
364+
"Use with caution."
340365
}
341366
}
342367

@@ -381,6 +406,14 @@ def __init__(self, **kwargs):
381406
"description":
382407
"Maximum estimated depth coverage allowed. FastQ with "
383408
"higher estimated depth will be subsampled to this value."
409+
},
410+
"clearInput": {
411+
"default": "false",
412+
"description":
413+
"Permanently removes temporary input files. This option "
414+
"is only useful to remove temporary files in large "
415+
"workflows and prevents nextflow's resume functionality. "
416+
"Use with caution."
384417
}
385418
}
386419

@@ -393,4 +426,4 @@ def __init__(self, **kwargs):
393426

394427
self.status_channels = [
395428
"downsample_fastq"
396-
]
429+
]

flowcraft/generator/components/typing.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,17 @@ def __init__(self, **kwargs):
106106

107107
self.link_end.append({"link": "__fastq", "alias": "_LAST_fastq"})
108108

109+
self.params = {
110+
"clearInput": {
111+
"default": "false",
112+
"description":
113+
"Permanently removes temporary input files. This option "
114+
"is only useful to remove temporary files in large "
115+
"workflows and prevents nextflow's resume functionality. "
116+
"Use with caution."
117+
}
118+
}
119+
109120
self.directives = {
110121
"momps": {
111122
"cpus": 3,

flowcraft/generator/inspect.py

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import re
22
import os
33
import sys
4+
import time
45
import curses
56
import signal
67
import locale
@@ -1378,6 +1379,13 @@ def _send_status_info(self, run_id):
13781379
general_details = self._prepare_general_details()
13791380
status_data = self._prepare_run_status_data()
13801381

1382+
# Add current year to start and stop dates
1383+
time_start = "{} {}".format(time.strftime("%Y"), self.time_start)
1384+
time_stop = "{} {}".format(time.strftime("%Y"), self.time_stop) \
1385+
if self.time_stop else "-"
1386+
# Get enconding for proper parsing of time
1387+
time_locale = locale.getlocale()[0]
1388+
13811389
status_json = {
13821390
"generalOverview": overview_data,
13831391
"generalDetails": general_details,
@@ -1386,8 +1394,9 @@ def _send_status_info(self, run_id):
13861394
"processInfo": self._convert_process_dict(),
13871395
"processTags": self.process_tags,
13881396
"runStatus": status_data,
1389-
"timeStart": str(self.time_start),
1390-
"timeStop": str(self.time_stop) if self.time_stop else "-",
1397+
"timeStart": time_start,
1398+
"timeStop": time_stop,
1399+
"timeLocale": time_locale,
13911400
"processes": list(self.processes)
13921401
}
13931402

flowcraft/generator/templates/bowtie.nf

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ if (params.index{{ param_id }} == null && params.reference{{ param_id }} == null
55
exit 1, "Provide only an index OR a reference fasta file."
66
}
77

8+
clear = params.clearInput{{ param_id }} ? "true" : "false"
9+
checkpointClear_{{ pid }} = Channel.value(clear)
810

911
if (params.reference{{ param_id }}){
1012

@@ -61,7 +63,23 @@ process bowtie_{{ pid }} {
6163

6264
script:
6365
"""
64-
bowtie2 -x $index -1 ${fastq_pair[0]} -2 ${fastq_pair[1]} -p $task.cpus 1> ${sample_id}.bam 2> ${sample_id}_bowtie2.log
66+
{
67+
bowtie2 -x $index -1 ${fastq_pair[0]} -2 ${fastq_pair[1]} -p $task.cpus 1> ${sample_id}.bam 2> ${sample_id}_bowtie2.log
68+
69+
if [ "$clear" = "true" ];
70+
then
71+
work_regex=".*/work/.{2}/.{30}/.*"
72+
file_source1=\$(readlink -f \$(pwd)/${fastq_pair[0]})
73+
file_source2=\$(readlink -f \$(pwd)/${fastq_pair[1]})
74+
if [[ "\$file_source1" =~ \$work_regex ]]; then
75+
rm \$file_source1 \$file_source2
76+
fi
77+
fi
78+
79+
echo pass > .status
80+
} || {
81+
echo fail > .status
82+
}
6583
"""
6684
}
6785

flowcraft/generator/templates/downsample_fastq.nf

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ IN_genome_size_{{ pid }} = Channel.value(params.genomeSize{{ param_id }})
55
IN_depth_{{ pid }} = Channel.value(params.depth{{ param_id }})
66
.map{it -> it.toString().isNumber() ? it : exit(1, "The depth parameter must be a number or a float. Provided value: '${params.depth{{ param_id }}}'")}
77

8-
clear = params.clearAtCheckpoint ? "true" : "false"
8+
clear = params.clearInput{{ param_id }} ? "true" : "false"
99
checkpointClear_{{ pid }} = Channel.value(clear)
1010

1111
process downsample_fastq_{{ pid }} {

0 commit comments

Comments
 (0)