Skip to content

Commit 540bd21

Browse files
Merge pull request #141 from assemblerflow/unique_process_identifiers
Unique process identifiers
2 parents d3fbf28 + 07bba78 commit 540bd21

File tree

3 files changed

+211
-9
lines changed

3 files changed

+211
-9
lines changed

changelog.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
### Bug fixes
1111

12+
- Fixed forks with same source process name.
1213
- Fixed `inspect` issue when tasks took more than a day in duration.
1314

1415
## 1.3.1

flowcraft/generator/pipeline_parser.py

Lines changed: 123 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ def remove_inner_forks(text):
8383

8484
return text
8585

86+
8687
def empty_tasks(p_string):
8788
"""
8889
Function to check if pipeline string is empty or has an empty string
@@ -141,7 +142,8 @@ def brackets_insanity_check(p_string):
141142
raise SanityError(
142143
"A different number of '(' and ')' was specified. There are "
143144
"{} extra '{}'. The number of '(' and ')'should be equal.".format(
144-
str(abs(p_string.count(FORK_TOKEN) - p_string.count(CLOSE_TOKEN))),
145+
str(abs(
146+
p_string.count(FORK_TOKEN) - p_string.count(CLOSE_TOKEN))),
145147
max_bracket))
146148

147149

@@ -261,7 +263,7 @@ def inner_fork_insanity_checks(pipeline_string):
261263

262264
# first lets get all forks to a list.
263265
list_of_forks = [] # stores forks
264-
left_indexes = [] # stores indexes of left brackets
266+
left_indexes = [] # stores indexes of left brackets
265267

266268
# iterate through the string looking for '(' and ')'.
267269
for pos, char in enumerate(pipeline_string):
@@ -337,8 +339,8 @@ def insanity_checks(pipeline_str):
337339

338340

339341
def parse_pipeline(pipeline_str):
340-
"""Parses a pipeline string into a dictionary with the connections between
341-
process
342+
"""Parses a pipeline string into a list of dictionaries with the connections
343+
between processes
342344
343345
Parameters
344346
----------
@@ -368,16 +370,25 @@ def parse_pipeline(pipeline_str):
368370
pipeline_links = []
369371
lane = 1
370372

373+
# Add unique identifiers to each process to allow a correct connection
374+
# between forks with same processes
375+
pipeline_str_modified, identifiers_to_tags = add_unique_identifiers(
376+
pipeline_str)
377+
371378
# Get number of forks in the pipeline
372-
nforks = pipeline_str.count(FORK_TOKEN)
379+
nforks = pipeline_str_modified.count(FORK_TOKEN)
373380
logger.debug("Found {} fork(s)".format(nforks))
374381

375382
# If there are no forks, connect the pipeline as purely linear
376383
if not nforks:
377384
logger.debug("Detected linear pipeline string : {}".format(
378385
pipeline_str))
379-
linear_pipeline = ["__init__"] + pipeline_str.split()
386+
linear_pipeline = ["__init__"] + pipeline_str_modified.split()
380387
pipeline_links.extend(linear_connection(linear_pipeline, lane))
388+
# Removes unique identifiers used for correctly assign fork parents with
389+
# a possible same process name
390+
pipeline_links = remove_unique_identifiers(identifiers_to_tags,
391+
pipeline_links)
381392
return pipeline_links
382393

383394
for i in range(nforks):
@@ -386,7 +397,7 @@ def parse_pipeline(pipeline_str):
386397
# Split the pipeline at each fork start position. fields[-1] will
387398
# hold the process after the fork. fields[-2] will hold the processes
388399
# before the fork.
389-
fields = pipeline_str.split(FORK_TOKEN, i + 1)
400+
fields = pipeline_str_modified.split(FORK_TOKEN, i + 1)
390401

391402
# Get the processes before the fork. This may be empty when the
392403
# fork is at the beginning of the pipeline.
@@ -431,6 +442,8 @@ def parse_pipeline(pipeline_str):
431442

432443
lane += len(fork_sink)
433444

445+
pipeline_links = remove_unique_identifiers(identifiers_to_tags,
446+
pipeline_links)
434447
return pipeline_links
435448

436449

@@ -498,7 +511,6 @@ def get_lanes(lanes_str):
498511
# Flag used to determined whether the cursor is inside or outside the
499512
# right fork
500513
infork = 0
501-
502514
for i in lanes_str:
503515

504516
# Nested fork started
@@ -565,7 +577,7 @@ def linear_connection(plist, lane):
565577

566578
def fork_connection(source, sink, source_lane, lane):
567579
"""Makes the connection between a process and the first processes in the
568-
lanes to wich it forks.
580+
lanes to which it forks.
569581
570582
The ``lane`` argument should correspond to the lane of the source process.
571583
For each lane in ``sink``, the lane counter will increase.
@@ -640,3 +652,105 @@ def linear_lane_connection(lane_list, lane):
640652
lane += 1
641653

642654
return res
655+
656+
657+
def add_unique_identifiers(pipeline_str):
658+
"""Returns the pipeline string with unique identifiers and a dictionary with
659+
references between the unique keys and the original values
660+
661+
Parameters
662+
----------
663+
pipeline_str : str
664+
Pipeline string
665+
666+
Returns
667+
-------
668+
str
669+
Pipeline string with unique identifiers
670+
dict
671+
Match between process unique values and original names
672+
"""
673+
674+
# Add space at beginning and end of pipeline to allow regex mapping of final
675+
# process in linear pipelines
676+
pipeline_str_modified = " {} ".format(pipeline_str)
677+
678+
# Regex to get all process names. Catch all words without spaces and that
679+
# are not fork tokens or pipes
680+
reg_find_proc = r"[^\s{}{}{}]+".format(LANE_TOKEN, FORK_TOKEN, CLOSE_TOKEN)
681+
process_names = re.findall(reg_find_proc, pipeline_str_modified)
682+
683+
identifiers_to_tags = {}
684+
"""
685+
dict: Matches new process names (identifiers) with original process
686+
names
687+
"""
688+
689+
new_process_names = []
690+
"""
691+
list: New process names used to replace in the pipeline string
692+
"""
693+
694+
# Assigns the new process names by appending a numeric id at the end of
695+
# the process name
696+
for index, val in enumerate(process_names):
697+
if "=" in val:
698+
parts = val.split("=")
699+
new_id = "{}_{}={}".format(parts[0], index, parts[1])
700+
else:
701+
new_id = "{}_{}".format(val, index)
702+
703+
# add new process with id
704+
new_process_names.append(new_id)
705+
# makes a match between new process name and original process name
706+
identifiers_to_tags[new_id] = val
707+
708+
# Add space between forks, pipes and the process names for the replace
709+
# regex to work
710+
match_result = lambda match: " {} ".format(match.group())
711+
712+
# force to add a space between each token so that regex modification can
713+
# be applied
714+
find = r'[{}{}{}]+'.format(FORK_TOKEN, LANE_TOKEN, CLOSE_TOKEN)
715+
pipeline_str_modified = re.sub(find, match_result, pipeline_str_modified)
716+
717+
# Replace original process names by the unique identifiers
718+
for index, val in enumerate(process_names):
719+
# regex to replace process names with non assigned process ids
720+
# escape characters are required to match to the dict keys
721+
# (identifiers_to_tags), since python keys with escape characters
722+
# must be escaped
723+
find = r'{}[^_]'.format(val).replace("\\", "\\\\")
724+
pipeline_str_modified = re.sub(find, new_process_names[index] + " ",
725+
pipeline_str_modified, 1)
726+
727+
return pipeline_str_modified, identifiers_to_tags
728+
729+
730+
def remove_unique_identifiers(identifiers_to_tags, pipeline_links):
731+
"""Removes unique identifiers and add the original process names to the
732+
already parsed pipelines
733+
734+
Parameters
735+
----------
736+
identifiers_to_tags : dict
737+
Match between unique process identifiers and process names
738+
pipeline_links: list
739+
Parsed pipeline list with unique identifiers
740+
741+
Returns
742+
-------
743+
list
744+
Pipeline list with original identifiers
745+
"""
746+
747+
# Replaces the unique identifiers by the original process names
748+
for index, val in enumerate(pipeline_links):
749+
if val["input"]["process"] != "__init__":
750+
val["input"]["process"] = identifiers_to_tags[
751+
val["input"]["process"]]
752+
if val["output"]["process"] != "__init__":
753+
val["output"]["process"] = identifiers_to_tags[
754+
val["output"]["process"]]
755+
756+
return pipeline_links

flowcraft/tests/test_pipeline_parser.py

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import os
2+
import json
23

34
import flowcraft.generator.pipeline_parser as ps
45
from flowcraft.tests.data_pipelines import pipelines as pipes
@@ -258,3 +259,89 @@ def test_parse_pipeline_file():
258259
res = ps.parse_pipeline(p_path)
259260
print(res)
260261
assert res == expected
262+
263+
def test_unique_id_len():
264+
265+
pip_list = [
266+
"A B C",
267+
"A (B C (D | E)| B C (D | E))",
268+
"A (B C (D | E)| C (D | E))",
269+
"A (B C (D | E)| B (D | E))",
270+
]
271+
272+
res_list = [
273+
"A_0 B_1 C_2",
274+
"A_0 (B_1 C_2 (D_3 | E_4)| B_5 C_6 (D_7 | E_8))",
275+
"A_0 (B_1 C_2 (D_3 | E_4)| C_5 (D_6 | E_7))",
276+
"A_0 (B_1 C_2 (D_3 | E_4)| B_5 (D_6 | E_7))",
277+
]
278+
279+
for x, pip_str in enumerate(pip_list):
280+
res_str, res_ids = ps.add_unique_identifiers(pip_str)
281+
assert res_str.replace(" ", "") == res_list[x].replace(" ", "")
282+
283+
def test_remove_id():
284+
285+
pip_list = [
286+
"A B C",
287+
"A (B C (D | E)| B C (D | E))",
288+
]
289+
290+
pipeline_mod_links = [
291+
[{'input': {'process': '__init__', 'lane': 1},
292+
'output': {'process': 'A_0', 'lane': 1}},
293+
{'input': {'process': 'A_0', 'lane': 1},
294+
'output': {'process': 'B_1', 'lane': 1}},
295+
{'input': {'process': 'B_1', 'lane': 1},
296+
'output': {'process': 'C_2', 'lane': 1}}],
297+
[{'input': {'process': '__init__', 'lane': 1},
298+
'output': {'process': 'A_0', 'lane': 1}},
299+
{'input': {'process': 'A_0', 'lane': 1},
300+
'output': {'process': 'B_1', 'lane': 2}},
301+
{'input': {'process': 'A_0', 'lane': 1},
302+
'output': {'process': 'B_5', 'lane': 3}},
303+
{'input': {'process': 'B_1', 'lane': 2},
304+
'output': {'process': 'C_2', 'lane': 2}},
305+
{'input': {'process': 'B_5', 'lane': 3},
306+
'output': {'process': 'C_6', 'lane': 3}},
307+
{'input': {'process': 'C_2', 'lane': 2},
308+
'output': {'process': 'D_3', 'lane': 4}},
309+
{'input': {'process': 'C_2', 'lane': 2},
310+
'output': {'process': 'E_4', 'lane': 5}},
311+
{'input': {'process': 'C_6', 'lane': 3},
312+
'output': {'process': 'D_7', 'lane': 6}},
313+
{'input': {'process': 'C_6', 'lane': 3},
314+
'output': {'process': 'E_8', 'lane': 7}}]
315+
]
316+
317+
pipeline_exp_links = [
318+
[{'input': {'process': '__init__', 'lane': 1},
319+
'output': {'process': 'A', 'lane': 1}},
320+
{'input': {'process': 'A', 'lane': 1},
321+
'output': {'process': 'B', 'lane': 1}},
322+
{'input': {'process': 'B', 'lane': 1},
323+
'output': {'process': 'C', 'lane': 1}}],
324+
[{'input': {'process': '__init__', 'lane': 1},
325+
'output': {'process': 'A', 'lane': 1}},
326+
{'input': {'process': 'A', 'lane': 1},
327+
'output': {'process': 'B', 'lane': 2}},
328+
{'input': {'process': 'A', 'lane': 1},
329+
'output': {'process': 'B', 'lane': 3}},
330+
{'input': {'process': 'B', 'lane': 2},
331+
'output': {'process': 'C', 'lane': 2}},
332+
{'input': {'process': 'B', 'lane': 3},
333+
'output': {'process': 'C', 'lane': 3}},
334+
{'input': {'process': 'C', 'lane': 2},
335+
'output': {'process': 'D', 'lane': 4}},
336+
{'input': {'process': 'C', 'lane': 2},
337+
'output': {'process': 'E', 'lane': 5}},
338+
{'input': {'process': 'C', 'lane': 3},
339+
'output': {'process': 'D', 'lane': 6}},
340+
{'input': {'process': 'C', 'lane': 3},
341+
'output': {'process': 'E', 'lane': 7}}]
342+
]
343+
344+
for x, pip_str in enumerate(pip_list):
345+
res_str, res_ids = ps.add_unique_identifiers(pip_str)
346+
res = ps.remove_unique_identifiers(res_ids, pipeline_mod_links[x])
347+
assert json.dumps(res) == json.dumps(pipeline_exp_links[x])

0 commit comments

Comments
 (0)