88import uuid
99import warnings
1010from collections import defaultdict , deque
11+ from collections .abc import Iterable
1112from copy import copy , deepcopy
1213from dataclasses import dataclass
1314from enum import Enum
4445from linkml_runtime .utils .pattern import PatternResolver
4546
4647if TYPE_CHECKING :
47- from collections .abc import Callable , Iterable , Mapping
48+ from collections .abc import Callable , Mapping
4849 from types import NotImplementedType
4950
5051 from linkml_runtime .utils .metamodelcore import URI , URIorCURIE
@@ -96,8 +97,13 @@ class OrderedBy(Enum):
9697BLACK = 2
9798
9899
99- def detect_cycles (f : Callable [[Any ], Iterable [Any ] | None ], x : Any ) -> None :
100- """Detect cycles in a graph, using function `f` to walk the graph, starting at node `x`.
100+ def detect_cycles (
101+ f : Callable [[Any ], Iterable [Any ] | None ],
102+ node_list : Iterable [Any ],
103+ ) -> None :
104+ """Detect cycles in a graph, using function `f` to walk the graph.
105+
106+ Input is supplied as a list of nodes that are used to populate the `todo` stack.
101107
102108 Uses the classic white/grey/black colour coding algorithm to track which nodes have been explored. In this
103109 case, "node" refers to any element in a schema and "neighbours" are elements that can be reached from that
@@ -107,21 +113,28 @@ def detect_cycles(f: Callable[[Any], Iterable[Any] | None], x: Any) -> None:
107113 GREY: node is being processed; processing includes exploring all neighbours reachable via f(node)
108114 BLACK: node and all of its neighbours (and their neighbours, etc.) have been processed
109115
110- A directed cycle reachable from node `x` raises a ValueError.
116+ A directed cycle reachable from a node or its neighbours raises a ValueError.
111117
112118 :param f: function that returns an iterable of neighbouring nodes (parents or children)
113119 :type f: Callable[[Any], Iterable[Any] | None]
114- :param x: graph node
115- :type x: Any
116- :raises ValueError: if a cycle is discovered through repeated calls to f(x )
120+ :param node_list: list or other iterable of values to process
121+ :type node_list: Iterable[ Any]
122+ :raises ValueError: if a cycle is discovered through repeated calls to f(node )
117123 """
124+ # ensure we have some nodes to start the analysis
125+ if not node_list or not isinstance (node_list , Iterable ) or isinstance (node_list , str ):
126+ err_msg = "detect_cycles requires a list of values to process"
127+ raise ValueError (err_msg )
128+
118129 # keep track of the processing state of nodes in the graph
119130 processing_state : dict [Any , int ] = {}
120131
121132 # Stack entries are (node, processed_flag).
122133 # processed_flag == True means all neighbours (nodes generated by running `f(node)`)
123134 # have been added to the todo stack and the node can be marked BLACK.
124- todo : list [tuple [Any , bool ]] = [(x , False )]
135+
136+ # initialise the todo stack with entries set to False
137+ todo : list [tuple [Any , bool ]] = [(node , False ) for node in node_list ]
125138
126139 while todo :
127140 node , processed_flag = todo .pop ()
@@ -173,7 +186,7 @@ def _closure(
173186 :rtype: list[str | ElementName | ClassDefinitionName | EnumDefinitionName | SlotDefinitionName | TypeDefinitionName]
174187 """
175188 if kwargs and kwargs .get ("detect_cycles" ):
176- detect_cycles (f , x )
189+ detect_cycles (f , [ x ] )
177190
178191 rv = [x ] if reflexive else []
179192 visited = []
@@ -452,8 +465,7 @@ def imports_closure(
452465 # visit item
453466 sn = todo .pop ()
454467 if sn not in self .schema_map :
455- imported_schema = self .load_import (sn )
456- self .schema_map [sn ] = imported_schema
468+ self .schema_map [sn ] = self .load_import (sn )
457469
458470 # resolve item's imports if it has not been visited already
459471 # we will get duplicates, but not cycles this way, and
@@ -886,7 +898,9 @@ def _parents(self, e: Element, imports: bool = True, mixins: bool = True, is_a:
886898 def class_parents (
887899 self , class_name : CLASS_NAME , imports : bool = True , mixins : bool = True , is_a : bool = True
888900 ) -> list [ClassDefinitionName ]:
889- """:param class_name: child class name
901+ """Get the parents of a class.
902+
903+ :param class_name: child class name
890904 :param imports: include import closure
891905 :param mixins: include mixins (default is True)
892906 :return: all direct parent class names (is_a and mixins)
@@ -1139,11 +1153,24 @@ def type_ancestors(
11391153 ** kwargs ,
11401154 )
11411155
1156+ @lru_cache (None )
1157+ def type_roots (self , imports : bool = True ) -> list [TypeDefinitionName ]:
1158+ """Return all types that have no parents.
1159+
1160+ :param imports: whether or not to include imports, defaults to True
1161+ :type imports: bool, optional
1162+ :return: list of all root types
1163+ :rtype: list[TypeDefinitionName]
1164+ """
1165+ return [t for t in self .all_types (imports = imports ) if not self .type_parents (t , imports = imports )]
1166+
11421167 @lru_cache (None )
11431168 def enum_parents (
11441169 self , enum_name : ENUM_NAME , imports : bool = False , mixins : bool = False , is_a : bool = True
11451170 ) -> list [EnumDefinitionName ]:
1146- """:param enum_name: child enum name
1171+ """Get the parents of an enum.
1172+
1173+ :param enum_name: child enum name
11471174 :param imports: include import closure (False)
11481175 :param mixins: include mixins (default is False)
11491176 :return: all direct parent enum names (is_a and mixins)
@@ -1180,12 +1207,20 @@ def enum_ancestors(
11801207 ** kwargs ,
11811208 )
11821209
1183- @lru_cache ( None )
1210+ @deprecated ( "Use `permissible_value_parents` instead" )
11841211 def permissible_value_parent (
11851212 self , permissible_value : str , enum_name : ENUM_NAME
11861213 ) -> list [str | PermissibleValueText ]:
1187- """:param enum_name: child enum name
1214+ return self .permissible_value_parents (permissible_value , enum_name )
1215+
1216+ @lru_cache (None )
1217+ def permissible_value_parents (
1218+ self , permissible_value : str , enum_name : ENUM_NAME
1219+ ) -> list [str | PermissibleValueText ]:
1220+ """Get the parents of a permissible value.
1221+
11881222 :param permissible_value: permissible value
1223+ :param enum_name: enum for which this is a permissible value
11891224 :return: all direct parent enum names (is_a)
11901225 """
11911226 enum = self .get_enum (enum_name , strict = True )
@@ -1199,8 +1234,10 @@ def permissible_value_parent(
11991234 def permissible_value_children (
12001235 self , permissible_value : str , enum_name : ENUM_NAME
12011236 ) -> list [str | PermissibleValueText ]:
1202- """:param enum_name: parent enum name
1237+ """Get the children of a permissible value.
1238+
12031239 :param permissible_value: permissible value
1240+ :param enum_name: enum for which this is a permissible value
12041241 :return: all direct child permissible values (is_a)
12051242 """
12061243 enum = self .get_enum (enum_name , strict = True )
@@ -1238,7 +1275,7 @@ def permissible_value_ancestors(
12381275 :rtype: list[str]
12391276 """
12401277 return _closure (
1241- lambda x : self .permissible_value_parent (x , enum_name ),
1278+ lambda x : self .permissible_value_parents (x , enum_name ),
12421279 permissible_value_text ,
12431280 reflexive = reflexive ,
12441281 depth_first = depth_first ,
@@ -1266,17 +1303,6 @@ def permissible_value_descendants(
12661303 ** kwargs ,
12671304 )
12681305
1269- @lru_cache (None )
1270- def type_roots (self , imports : bool = True ) -> list [TypeDefinitionName ]:
1271- """Return all types that have no parents.
1272-
1273- :param imports: whether or not to include imports, defaults to True
1274- :type imports: bool, optional
1275- :return: list of all root types
1276- :rtype: list[TypeDefinitionName]
1277- """
1278- return [t for t in self .all_types (imports = imports ) if not self .type_parents (t , imports = imports )]
1279-
12801306 @lru_cache (None )
12811307 def is_multivalued (self , slot_name : SlotDefinition ) -> bool :
12821308 """Return True if slot is multivalued, else returns False.
@@ -1818,6 +1844,33 @@ def get_type_designator_slot(self, cn: CLASS_NAME, imports: bool = True) -> Slot
18181844 return s
18191845 return None
18201846
1847+ @lru_cache (None )
1848+ def _get_string_type (self ) -> TypeDefinition :
1849+ """Get the type used for representing strings.
1850+
1851+ This can be used for (e.g.) retrieving the appropriate type for a slot where the range is an enum.
1852+
1853+ The method assumes that the string type will either be called "string" or
1854+ will have the URI "xsd:string", as is the case for the "string" type in linkml:types.
1855+
1856+ This method throws an error if there is anything other than one type that fits the criteria.
1857+
1858+ :return: the "string" type object
1859+ :rtype: TypeDefinition
1860+ """
1861+ str_type = self .get_type ("string" )
1862+ if str_type :
1863+ return str_type
1864+
1865+ # if there isn't a type named "string", search for a type with the URI xsd:string
1866+ str_type_arr = [v for v in self .all_types ().values () if v .uri == "xsd:string" ]
1867+ if len (str_type_arr ) == 1 :
1868+ return str_type_arr [0 ]
1869+
1870+ # zero or more than one potential "string" type found
1871+ err_msg = f"Cannot find a suitable 'string' type: no types with name 'string' { 'and more than one type with' if str_type_arr else 'or' } uri 'xsd:string'."
1872+ raise ValueError (err_msg )
1873+
18211874 def is_inlined (self , slot : SlotDefinition , imports : bool = True ) -> bool :
18221875 """Return true if slot is inferred or asserted inline.
18231876
@@ -1850,6 +1903,10 @@ def slot_applicable_range_elements(self, slot: SlotDefinition) -> list[ClassDefi
18501903 :param slot:
18511904 :return: list of element types
18521905 """
1906+ if not slot or not isinstance (slot , SlotDefinition ):
1907+ err_msg = "A SlotDefinition must be provided to generate the slot applicable range elements."
1908+ raise ValueError (err_msg )
1909+
18531910 is_any = False
18541911 range_types = []
18551912 for r in self .slot_range_as_union (slot ):
@@ -1876,6 +1933,10 @@ def slot_range_as_union(self, slot: SlotDefinition) -> list[ElementName]:
18761933 :param slot:
18771934 :return: list of ranges
18781935 """
1936+ if not slot or not isinstance (slot , SlotDefinition ):
1937+ err_msg = "A SlotDefinition must be provided to generate the slot range as union."
1938+ raise ValueError (err_msg )
1939+
18791940 return list ({y .range for y in [slot , * [x for x in [* slot .exactly_one_of , * slot .any_of ] if x .range ]]})
18801941
18811942 def induced_slot_range (self , slot : SlotDefinition , strict : bool = False ) -> set [str | ElementName ]: # noqa: FBT001, FBT002
@@ -1893,6 +1954,9 @@ def induced_slot_range(self, slot: SlotDefinition, strict: bool = False) -> set[
18931954 :return: set of ranges
18941955 :rtype: set[str | ElementName]
18951956 """
1957+ if not slot or not isinstance (slot , SlotDefinition ):
1958+ err_msg = "A SlotDefinition must be provided to generate the induced slot range."
1959+ raise ValueError (err_msg )
18961960
18971961 slot_range = slot .range
18981962 any_of_range = {x .range for x in slot .any_of if x .range }
0 commit comments