|
8 | 8 | import uuid |
9 | 9 | import warnings |
10 | 10 | from collections import defaultdict, deque |
11 | | -from collections.abc import Callable, Mapping |
12 | 11 | from copy import copy, deepcopy |
13 | 12 | from dataclasses import dataclass |
14 | 13 | from enum import Enum |
|
45 | 44 | from linkml_runtime.utils.pattern import PatternResolver |
46 | 45 |
|
47 | 46 | if TYPE_CHECKING: |
48 | | - from collections.abc import Mapping |
| 47 | + from collections.abc import Callable, Iterable, Mapping |
49 | 48 | from types import NotImplementedType |
50 | 49 |
|
51 | 50 | from linkml_runtime.utils.metamodelcore import URI, URIorCURIE |
@@ -92,13 +91,90 @@ class OrderedBy(Enum): |
92 | 91 | """ |
93 | 92 |
|
94 | 93 |
|
| 94 | +WHITE = 0 |
| 95 | +GREY = 1 |
| 96 | +BLACK = 2 |
| 97 | + |
| 98 | + |
| 99 | +def detect_cycles(f: Callable[[Any], Iterable[Any] | None], x: Any) -> None: |
| 100 | + """Detect cycles in a graph, using function `f` to walk the graph, starting at node `x`. |
| 101 | +
|
| 102 | + Uses the classic white/grey/black colour coding algorithm to track which nodes have been explored. In this |
| 103 | + case, "node" refers to any element in a schema and "neighbours" are elements that can be reached from that |
| 104 | + node by executing function `f`. |
| 105 | +
|
| 106 | + WHITE: unexplored |
| 107 | + GREY: node is being processed; processing includes exploring all neighbours reachable via f(node) |
| 108 | + BLACK: node and all of its neighbours (and their neighbours, etc.) have been processed |
| 109 | +
|
| 110 | + A directed cycle reachable from node `x` raises a ValueError. |
| 111 | +
|
| 112 | + :param f: function that returns an iterable of neighbouring nodes (parents or children) |
| 113 | + :type f: Callable[[Any], Iterable[Any] | None] |
| 114 | + :param x: graph node |
| 115 | + :type x: Any |
| 116 | + :raises ValueError: if a cycle is discovered through repeated calls to f(x) |
| 117 | + """ |
| 118 | + # keep track of the processing state of nodes in the graph |
| 119 | + processing_state: dict[Any, int] = {} |
| 120 | + |
| 121 | + # Stack entries are (node, processed_flag). |
| 122 | + # processed_flag == True means all neighbours (nodes generated by running `f(node)`) |
| 123 | + # have been added to the todo stack and the node can be marked BLACK. |
| 124 | + todo: list[tuple[Any, bool]] = [(x, False)] |
| 125 | + |
| 126 | + while todo: |
| 127 | + node, processed_flag = todo.pop() |
| 128 | + |
| 129 | + if processed_flag: |
| 130 | + # all neighbours have been processed |
| 131 | + processing_state[node] = BLACK |
| 132 | + continue |
| 133 | + |
| 134 | + # check the state of this node |
| 135 | + node_state = processing_state.get(node, WHITE) |
| 136 | + |
| 137 | + if node_state == GREY: |
| 138 | + # this node was already being processed |
| 139 | + # we have discovered an edge back to that node - i.e. a cycle |
| 140 | + err_msg = f"Cycle detected at node {node!r}" |
| 141 | + raise ValueError(err_msg) |
| 142 | + |
| 143 | + if node_state == BLACK: |
| 144 | + # already fully explored - nothing to do |
| 145 | + continue |
| 146 | + |
| 147 | + # mark the node as being processed (GREY) and set the processed_flag to True |
| 148 | + processing_state[node] = GREY |
| 149 | + todo.append((node, True)) |
| 150 | + |
| 151 | + # push the neighbours on to the processing stack |
| 152 | + todo.extend((child, False) for child in f(node) or []) |
| 153 | + |
| 154 | + |
95 | 155 | def _closure( |
96 | | - f: Callable, |
97 | | - x, |
| 156 | + f: Callable[[Any], Iterable[Any] | None], |
| 157 | + x: Any, |
98 | 158 | reflexive: bool = True, |
99 | 159 | depth_first: bool = True, |
100 | | - **kwargs: dict[str, Any] | None, # noqa: ARG001 |
| 160 | + **kwargs: dict[str, Any] | None, |
101 | 161 | ) -> list[str | ElementName | ClassDefinitionName | EnumDefinitionName | SlotDefinitionName | TypeDefinitionName]: |
| 162 | + """Walk the graph using function `f` and generate the closure. |
| 163 | +
|
| 164 | + :param f: function that returns an iterable of neighbouring nodes (parents or children) |
| 165 | + :type f: Callable[[Any], Iterable[Any] | None] |
| 166 | + :param x: start node |
| 167 | + :type x: Any |
| 168 | + :param reflexive: assume the graph is reflexive, defaults to True |
| 169 | + :type reflexive: bool, optional |
| 170 | + :param depth_first: depth first traversal, defaults to True |
| 171 | + :type depth_first: bool, optional |
| 172 | + :return: list of nodes |
| 173 | + :rtype: list[str | ElementName | ClassDefinitionName | EnumDefinitionName | SlotDefinitionName | TypeDefinitionName] |
| 174 | + """ |
| 175 | + if kwargs and kwargs.get("detect_cycles"): |
| 176 | + detect_cycles(f, x) |
| 177 | + |
102 | 178 | rv = [x] if reflexive else [] |
103 | 179 | visited = [] |
104 | 180 | todo = [x] |
|
0 commit comments