22import inspect
33import logging
44
5- from pprint import pformat
65from pathlib import Path
7- from typing import Any , List , Dict , Tuple
8-
9- from chipflow_lib import _parse_config , _ensure_chipflow_root , ChipFlowError
10- from chipflow_lib .platforms import (
11- PACKAGE_DEFINITIONS ,
12- PIN_ANNOTATION_SCHEMA ,
13- top_interfaces ,
14- LockFile ,
15- Package ,
16- PortMap ,
17- Port
18- )
19- from chipflow_lib .config_models import Config
6+ from pprint import pformat
7+
8+ from . import _parse_config , _ensure_chipflow_root , ChipFlowError
9+ from .platforms import top_components , LockFile , PACKAGE_DEFINITIONS
2010
2111# logging.basicConfig(stream=sys.stdout, level=logging.DEBUG)
2212logger = logging .getLogger (__name__ )
2313
2414
25- def count_member_pins (name : str , member : Dict [str , Any ]) -> int :
26- "Counts the pins from amaranth metadata"
27- logger .debug (
28- f"count_pins { name } { member ['type' ]} "
29- f"{ member ['annotations' ] if 'annotations' in member else 'no annotations' } "
30- )
31- if member ['type' ] == 'interface' and 'annotations' in member \
32- and PIN_ANNOTATION_SCHEMA in member ['annotations' ]:
33- return member ['annotations' ][PIN_ANNOTATION_SCHEMA ]['width' ]
34- elif member ['type' ] == 'interface' :
35- width = 0
36- for n , v in member ['members' ].items ():
37- width += count_member_pins ('_' .join ([name , n ]), v )
38- return width
39- elif member ['type' ] == 'port' :
40- return member ['width' ]
41-
42-
43- def allocate_pins (name : str , member : Dict [str , Any ], pins : List [str ], port_name : str = None ) -> Tuple [Dict [str , Port ], List [str ]]:
44- "Allocate pins based of Amaranth member metadata"
45-
46- if port_name is None :
47- port_name = name
48-
49- pin_map = {}
50-
51- logger .debug (f"allocate_pins: name={ name } , pins={ pins } " )
52- logger .debug (f"member={ pformat (member )} " )
53-
54- if member ['type' ] == 'interface' and 'annotations' in member \
55- and PIN_ANNOTATION_SCHEMA in member ['annotations' ]:
56- logger .debug ("matched IOSignature {sig}" )
57- sig = member ['annotations' ][PIN_ANNOTATION_SCHEMA ]
58- width = sig ['width' ]
59- options = sig ['options' ]
60- pin_map [name ] = {'pins' : pins [0 :width ],
61- 'direction' : sig ['direction' ],
62- 'type' : 'io' ,
63- 'port_name' : port_name ,
64- 'options' : options }
65- logger .debug (f"added '{ name } ':{ pin_map [name ]} to pin_map" )
66- return pin_map , pins [width :]
67- elif member ['type' ] == 'interface' :
68- for k , v in member ['members' ].items ():
69- port_name = '_' .join ([name , k ])
70- _map , pins = allocate_pins (k , v , pins , port_name = port_name )
71- pin_map |= _map
72- logger .debug (f"{ pin_map } ,{ _map } " )
73- return pin_map , pins
74- elif member ['type' ] == 'port' :
75- logger .warning (f"Port '{ name } ' has no IOSignature, pin allocation likely to be wrong" )
76- width = member ['width' ]
77- pin_map [name ] = {'pins' : pins [0 :width ],
78- 'direction' : member ['dir' ],
79- 'type' : 'io' ,
80- 'port_name' : port_name
81- }
82- logger .debug (f"added '{ name } ':{ pin_map [name ]} to pin_map" )
83- return pin_map , pins [width :]
84- else :
85- logging .debug (f"Shouldnt get here. member = { member } " )
86- assert False
87-
88-
8915def lock_pins () -> None :
90- # Get the config as dict for backward compatibility with top_interfaces
91- config_dict = _parse_config ()
16+ config = _parse_config ()
9217
9318 # Parse with Pydantic for type checking and strong typing
94- config_model = Config .model_validate (config_dict )
95-
96- used_pins = set ()
97- oldlock = None
9819
9920 chipflow_root = _ensure_chipflow_root ()
10021 lockfile = Path (chipflow_root , 'pins.lock' )
22+ oldlock = None
23+
10124 if lockfile .exists ():
102- json_string = lockfile .read_text ()
103- oldlock = LockFile .model_validate_json (json_string )
104-
105- print (f"Locking pins: { 'using pins.lock' if lockfile .exists () else '' } " )
106-
107- process = config_model .chipflow .silicon .process
108- package_name = config_model .chipflow .silicon .package
109-
110- if package_name not in PACKAGE_DEFINITIONS :
111- logger .debug (f"Package '{ package_name } is unknown" )
112- package_type = PACKAGE_DEFINITIONS [package_name ]
113-
114- package = Package (package_type = package_type )
115-
116- # Process pads and power configurations using Pydantic models
117- for d in ("pads" , "power" ):
118- logger .debug (f"Checking [chipflow.silicon.{ d } ]:" )
119- silicon_config = getattr (config_model .chipflow .silicon , d , {})
120- for k , v in silicon_config .items ():
121- pin = str (v .loc )
122- used_pins .add (pin )
123-
124- # Convert Pydantic model to dict for backward compatibility
125- v_dict = {"type" : v .type , "loc" : v .loc }
126- port = oldlock .package .check_pad (k , v_dict ) if oldlock else None
127-
128- if port and port .pins != [pin ]:
129- raise ChipFlowError (
130- f"chipflow.toml conflicts with pins.lock: "
131- f"{ k } had pin { port .pins } , now { [pin ]} ."
132- )
133-
134- # Add pad to package
135- package .add_pad (k , v_dict )
136-
137- logger .debug (f'Pins in use: { package_type .sortpins (used_pins )} ' )
138-
139- unallocated = package_type .pins - used_pins
140-
141- logger .debug (f"unallocated pins = { package_type .sortpins (unallocated )} " )
142-
143- # Use the raw dict for top_interfaces since it expects the legacy format
144- _ , interfaces = top_interfaces (config_dict )
145-
146- logger .debug (f"All interfaces:\n { pformat (interfaces )} " )
147-
148- port_map = PortMap ({})
149- # we try to keep pins together for each interface
150- for component , iface in interfaces .items ():
151- for k , v in iface ['interface' ]['members' ].items ():
152- logger .debug (f"Interface { component } .{ k } :" )
153- logger .debug (pformat (v ))
154- width = count_member_pins (k , v )
155- logger .debug (f" { k } : total { width } pins" )
156- old_ports = oldlock .port_map .get_ports (component , k ) if oldlock else None
157- if old_ports :
158- logger .debug (f" { component } .{ k } found in pins.lock, reusing" )
159- logger .debug (pformat (old_ports ))
160- old_width = sum ([len (p .pins ) for p in old_ports .values ()])
161- if old_width != width :
162- raise ChipFlowError (
163- f"top level interface has changed size. "
164- f"Old size = { old_width } , new size = { width } "
165- )
166- port_map .add_ports (component , k , old_ports )
167- else :
168- pins = package_type .allocate (unallocated , width )
169- if len (pins ) == 0 :
170- raise ChipFlowError ("No pins were allocated by {package}" )
171- logger .debug (f"allocated range: { pins } " )
172- unallocated = unallocated - set (pins )
173- _map , _ = allocate_pins (k , v , pins )
174- port_map .add_ports (component , k , _map )
175-
176- newlock = LockFile (process = process ,
177- package = package ,
178- port_map = port_map ,
179- metadata = interfaces )
25+ print ("Reusing current pin allocation from `pins.lock`" )
26+ oldlock = LockFile .model_validate_json (lockfile .read_text ())
27+ logger .debug (f"Old Lock =\n { pformat (oldlock )} " )
28+ logger .debug (f"Locking pins: { 'using pins.lock' if lockfile .exists () else '' } " )
29+
30+ if not config .chipflow .silicon :
31+ raise ChipFlowError ("no [chipflow.silicon] section found in chipflow.toml" )
32+
33+ # Get package definition from dict instead of Pydantic model
34+ package_name = config .chipflow .silicon .package
35+ package_def = PACKAGE_DEFINITIONS [package_name ]
36+ process = config .chipflow .silicon .process
37+
38+ top = top_components (config )
39+
40+ # Use the PackageDef to allocate the pins:
41+ for name , component in top .items ():
42+ package_def .register_component (name , component )
43+
44+ newlock = package_def .allocate_pins (config , process , oldlock )
18045
18146 with open (lockfile , 'w' ) as f :
18247 f .write (newlock .model_dump_json (indent = 2 , serialize_as_any = True ))
@@ -187,9 +52,10 @@ def __init__(self, config):
18752 self .config = config
18853
18954 def build_cli_parser (self , parser ):
55+ assert inspect .getdoc (self .lock ) is not None
19056 action_argument = parser .add_subparsers (dest = "action" )
19157 action_argument .add_parser (
192- "lock" , help = inspect .getdoc (self .lock ).splitlines ()[0 ])
58+ "lock" , help = inspect .getdoc (self .lock ).splitlines ()[0 ]) # type: ignore
19359
19460 def run_cli (self , args ):
19561 logger .debug (f"command { args } " )
0 commit comments