22import inspect
33import logging
44
5- from pprint import pformat
65from pathlib import Path
7- from typing import Any , List , Dict , Tuple
8-
9- from chipflow_lib import _parse_config , _ensure_chipflow_root , ChipFlowError
10- from chipflow_lib .platforms import (
11- PACKAGE_DEFINITIONS ,
12- PIN_ANNOTATION_SCHEMA ,
13- top_interfaces ,
14- LockFile ,
15- Package ,
16- PortMap ,
17- Port
18- )
19- from chipflow_lib .config_models import Config
6+ from pprint import pformat
7+
8+ from . import _parse_config , _ensure_chipflow_root , ChipFlowError
9+ from .platforms import top_components , LockFile , PACKAGE_DEFINITIONS
2010
2111# logging.basicConfig(stream=sys.stdout, level=logging.DEBUG)
2212logger = logging .getLogger (__name__ )
2313
2414
25- def count_member_pins (name : str , member : Dict [str , Any ]) -> int :
26- "Counts the pins from amaranth metadata"
27- logger .debug (
28- f"count_pins { name } { member ['type' ]} "
29- f"{ member ['annotations' ] if 'annotations' in member else 'no annotations' } "
30- )
31- if member ['type' ] == 'interface' and 'annotations' in member \
32- and PIN_ANNOTATION_SCHEMA in member ['annotations' ]:
33- return member ['annotations' ][PIN_ANNOTATION_SCHEMA ]['width' ]
34- elif member ['type' ] == 'interface' :
35- width = 0
36- for n , v in member ['members' ].items ():
37- width += count_member_pins ('_' .join ([name , n ]), v )
38- return width
39- elif member ['type' ] == 'port' :
40- return member ['width' ]
41-
42-
43- def allocate_pins (name : str , member : Dict [str , Any ], pins : List [str ], port_name : str = None ) -> Tuple [Dict [str , Port ], List [str ]]:
44- "Allocate pins based of Amaranth member metadata"
45-
46- if port_name is None :
47- port_name = name
48-
49- pin_map = {}
50-
51- logger .debug (f"allocate_pins: name={ name } , pins={ pins } " )
52- logger .debug (f"member={ pformat (member )} " )
53-
54- if member ['type' ] == 'interface' and 'annotations' in member \
55- and PIN_ANNOTATION_SCHEMA in member ['annotations' ]:
56- logger .debug ("matched IOSignature {sig}" )
57- sig = member ['annotations' ][PIN_ANNOTATION_SCHEMA ]
58- width = sig ['width' ]
59- options = sig ['options' ]
60- pin_map [name ] = {'pins' : pins [0 :width ],
61- 'direction' : sig ['direction' ],
62- 'type' : 'io' ,
63- 'port_name' : port_name ,
64- 'options' : options }
65- if 'invert' in sig and sig ['invert' ]:
66- pin_map [name ]['invert' ] = sig ['invert' ]
67-
68- logger .debug (f"added '{ name } ':{ pin_map [name ]} to pin_map" )
69- return pin_map , pins [width :]
70- elif member ['type' ] == 'interface' :
71- for k , v in member ['members' ].items ():
72- port_name = '_' .join ([name , k ])
73- _map , pins = allocate_pins (k , v , pins , port_name = port_name )
74- pin_map |= _map
75- logger .debug (f"{ pin_map } ,{ _map } " )
76- return pin_map , pins
77- elif member ['type' ] == 'port' :
78- logger .warning (f"Port '{ name } ' has no IOSignature, pin allocation likely to be wrong" )
79- width = member ['width' ]
80- pin_map [name ] = {'pins' : pins [0 :width ],
81- 'direction' : member ['dir' ],
82- 'type' : 'io' ,
83- 'port_name' : port_name
84- }
85- logger .debug (f"added '{ name } ':{ pin_map [name ]} to pin_map" )
86- return pin_map , pins [width :]
87- else :
88- logging .debug (f"Shouldnt get here. member = { member } " )
89- assert False
90-
91-
9215def lock_pins () -> None :
93- # Get the config as dict for backward compatibility with top_interfaces
94- config_dict = _parse_config ()
16+ config = _parse_config ()
9517
9618 # Parse with Pydantic for type checking and strong typing
97- config_model = Config .model_validate (config_dict )
98-
99- used_pins = set ()
100- oldlock = None
10119
10220 chipflow_root = _ensure_chipflow_root ()
10321 lockfile = Path (chipflow_root , 'pins.lock' )
22+ oldlock = None
23+
10424 if lockfile .exists ():
105- json_string = lockfile .read_text ()
106- oldlock = LockFile .model_validate_json (json_string )
107-
108- print (f"Locking pins: { 'using pins.lock' if lockfile .exists () else '' } " )
109-
110- process = config_model .chipflow .silicon .process
111- package_name = config_model .chipflow .silicon .package
112-
113- if package_name not in PACKAGE_DEFINITIONS :
114- logger .debug (f"Package '{ package_name } is unknown" )
115- package_type = PACKAGE_DEFINITIONS [package_name ]
116-
117- package = Package (package_type = package_type )
118-
119- # Process pads and power configurations using Pydantic models
120- for d in ("pads" , "power" ):
121- logger .debug (f"Checking [chipflow.silicon.{ d } ]:" )
122- silicon_config = getattr (config_model .chipflow .silicon , d , {})
123- for k , v in silicon_config .items ():
124- pin = str (v .loc )
125- used_pins .add (pin )
126-
127- # Convert Pydantic model to dict for backward compatibility
128- v_dict = {"type" : v .type , "loc" : v .loc }
129- port = oldlock .package .check_pad (k , v_dict ) if oldlock else None
130-
131- if port and port .pins != [pin ]:
132- raise ChipFlowError (
133- f"chipflow.toml conflicts with pins.lock: "
134- f"{ k } had pin { port .pins } , now { [pin ]} ."
135- )
136-
137- # Add pad to package
138- package .add_pad (k , v_dict )
139-
140- logger .debug (f'Pins in use: { package_type .sortpins (used_pins )} ' )
141-
142- unallocated = package_type .pins - used_pins
143-
144- logger .debug (f"unallocated pins = { package_type .sortpins (unallocated )} " )
145-
146- # Use the raw dict for top_interfaces since it expects the legacy format
147- _ , interfaces = top_interfaces (config_dict )
148-
149- logger .debug (f"All interfaces:\n { pformat (interfaces )} " )
150-
151- port_map = PortMap ({})
152- # we try to keep pins together for each interface
153- for component , iface in interfaces .items ():
154- for k , v in iface ['interface' ]['members' ].items ():
155- logger .debug (f"Interface { component } .{ k } :" )
156- logger .debug (pformat (v ))
157- width = count_member_pins (k , v )
158- logger .debug (f" { k } : total { width } pins" )
159- old_ports = oldlock .port_map .get_ports (component , k ) if oldlock else None
160- if old_ports :
161- logger .debug (f" { component } .{ k } found in pins.lock, reusing" )
162- logger .debug (pformat (old_ports ))
163- old_width = sum ([len (p .pins ) for p in old_ports .values ()])
164- if old_width != width :
165- raise ChipFlowError (
166- f"top level interface has changed size. "
167- f"Old size = { old_width } , new size = { width } "
168- )
169- port_map .add_ports (component , k , old_ports )
170- else :
171- pins = package_type .allocate (unallocated , width )
172- if len (pins ) == 0 :
173- raise ChipFlowError ("No pins were allocated by {package}" )
174- logger .debug (f"allocated range: { pins } " )
175- unallocated = unallocated - set (pins )
176- _map , _ = allocate_pins (k , v , pins )
177- port_map .add_ports (component , k , _map )
178-
179- newlock = LockFile (process = process ,
180- package = package ,
181- port_map = port_map ,
182- metadata = interfaces )
25+ print ("Reusing current pin allocation from `pins.lock`" )
26+ oldlock = LockFile .model_validate_json (lockfile .read_text ())
27+ logger .debug (f"Old Lock =\n { pformat (oldlock )} " )
28+ logger .debug (f"Locking pins: { 'using pins.lock' if lockfile .exists () else '' } " )
29+
30+ if not config .chipflow .silicon :
31+ raise ChipFlowError ("no [chipflow.silicon] section found in chipflow.toml" )
32+
33+ # Get package definition from dict instead of Pydantic model
34+ package_name = config .chipflow .silicon .package
35+ package_def = PACKAGE_DEFINITIONS [package_name ]
36+ process = config .chipflow .silicon .process
37+
38+ top = top_components (config )
39+
40+ # Use the PackageDef to allocate the pins:
41+ for name , component in top .items ():
42+ package_def .register_component (name , component )
43+
44+ newlock = package_def .allocate_pins (config , process , oldlock )
18345
18446 with open (lockfile , 'w' ) as f :
18547 f .write (newlock .model_dump_json (indent = 2 , serialize_as_any = True ))
@@ -190,9 +52,10 @@ def __init__(self, config):
19052 self .config = config
19153
19254 def build_cli_parser (self , parser ):
55+ assert inspect .getdoc (self .lock ) is not None
19356 action_argument = parser .add_subparsers (dest = "action" )
19457 action_argument .add_parser (
195- "lock" , help = inspect .getdoc (self .lock ).splitlines ()[0 ])
58+ "lock" , help = inspect .getdoc (self .lock ).splitlines ()[0 ]) # type: ignore
19659
19760 def run_cli (self , args ):
19861 logger .debug (f"command { args } " )
0 commit comments