diff --git a/README.md b/README.md index 47fe675..770c7c7 100644 --- a/README.md +++ b/README.md @@ -19,9 +19,11 @@ Roger Morales-Monge, student, Tecnológico de Costa Rica 3. [Cloning benchmarks](#cloning-benchmarks) 4. [Executing Demo](#executing-demo) 5. [Using AxLS](#using-axls) - 1. [Parsing a netlist](#parsing-a-netlist) - 2. [Deleting a node](#deleting-a-node) - 3. [Simulation and Error Estimation](#simulation-and-error-estimation) + 1. [CLI and simplified API usage](#cli-and-simplified-api-usage) + 2. [Library usage](#library-usage) + 1. [Parsing a netlist](#parsing-a-netlist) + 2. [Deleting a node](#deleting-a-node) + 3. [Simulation and Error Estimation](#simulation-and-error-estimation) 6. [ALS Algorithms](#als-algorithms) 1. [Pruning Algorithms](#pruning-algorithms) - [InOuts](#inouts) @@ -179,7 +181,44 @@ Mean Error Distance of approximate circuit with node _101_ deleted: 3.979 ## Using AxLS -### Parsing a netlist +AxLS can be used in multiple ways, either through a CLI, it also accepts configuration parameters programmatically in order to execute the ALS methods in a simplified way, or the ALS methods can be used directly—library style. + +### CLI and simplified API usage + +To print the help, run: + +```sh +python . -h +``` + +The CLI has 2 subcommands: + +- `run` for executing an ALS method. +- `generate` in order to generate datasets to be used by the ALS execution for simulation or (in the case of ML methods) training. + +Here's an example usage generating a dataset and executing an ALS method: + +```sh +# Requires previously having cloned the ALS-benchmark-circuits repo (see cloning benchmarks section) +CIRCUIT=ALS-benchmark-circuits/KS_16b/KS_16b.v +# We generate a simulation dataset of 10k possible input/output pairs for the KS_16b circuit. +# Uses a uniform distribution of inputs by default. +python . generate $CIRCUIT test_dataset 10000 +# Run the inconst method, calculating the MRED, circuit area and execution time metrics, +# accepting at max 20% error introduction, separating 10% of the dataset for validation, +# and pruning 10 nodes per iteration. +python . run inconst $CIRCUIT test_dataset mred time area --error 0.2 --validation 0.1 --prunes-per-iteration 10 +``` + +The tool can also be used programmatically with an interface very similar to the CLI. By using the `run` +method from `run.py` directly, passing in an `ApproxSynthesisConfig` configuration object. + +### Library usage + +This section introduces some basic concepts to manipulate a netlist directly +which is a key part of employing the different ALS mehtods directly. + +#### Parsing a netlist 1. First, import the `Circuit` class: @@ -250,7 +289,7 @@ Using this node you can implement your own pruning algorithms. Because ElementTr -### Deleting a node +#### Deleting a node 1. The first example method we provide to delete nodes is quite simple, just delete a node based on its name. You can do it in two different ways: @@ -269,7 +308,7 @@ our_circuit.delete("_101_") When you set the attribute `delete` of a node to `yes`, it means that this node will be deleted the next time our circuit is saved in the filesystem. **The node will remain in the xml tree!** (just in case we need to revert a deletion). -### Simulation and Error Estimation +#### Simulation and Error Estimation Simulation stage and error estimation are executed inside one method called `simulate_and_compute_error`. But first, in order to execute a simulation and calculate its error you need to provide: @@ -336,7 +375,7 @@ This framework currently provides 2 kinds of ALS algorithms: These algorithms suggest which nodes to delete based on circuit data or heuristics. -TODO: Missing documentation on `ccarving` and `glpsignificance` +**TODO: Missing documentation on `ccarving` and `glpsignificance`** #### InOuts @@ -598,25 +637,26 @@ introducing around ~23% error. Files and Folders description: -| Name | Description | Used | -| ------------------- | ------------------------------------------------------------ | ------ | -| prunning_algorithms | Folder containing pruning techniques implementations. | | -| `inouts.py` | Contains the implementation of `GetInputs` and `GetOutputs` example pruning methods. | | -| `probprun.py` | Contains the implementation of a pseudo Probabilistic Pruning method. `GetOneNode` is a python generator. It will retrieve one node to delete each time it is called. | | -| templates | Folder containing some libraries and scripts used for synthesis. | | -| `NanGate15nm.lib` | | | -| `NanGate15nm.v` | | | -| `synth.ys` | Script to synthesize a circuit using yosys. | | -| `__main__.py` | It executes the tool using the arguments from the command line. **Still in progress**. | **No** | -| `barcas.py` | Is the Pruning Implementation using the InOuts techniques. | **NO** | -| `circuit.py` | Object that represents a circuit as a XML tree. Receives a rtl and a library in order to build the circuit and be able to simulate it. | | -| `circuiterror.py` | Compares two outputs and computes different error metrics. | | -| `demo.py` | This file is a complete example of how the library should be used. | | -| `netlist.py` | This class parses, extracts and represents the circuit from rtl into an object understandable by python. | | -| `poisonoak.config` | This is going to be used along with `__main__.py` in order to execute poisonoak as an app, and not as a library. | **No** | -| `poisonoak.help` | Contains the menu and tool description of the poison oak app. | **No** | -| `synthesis.py` | Executes the synthesis script (in our case yosys) and clean the intermediate files generated. At the end returns the path of the netlist. | | -| `technology.py` | This class parses, extracts and represents the technology library file into an object understandable by python. | | -| `test.py` | This class implements some unit tests for the poison oak library. **Not implemented yet**. | **No** | -| `utils.py` | Some functions not related with any other class but useful. | | +| Name | Description | +| ------------------- | ------------------------------------------------------------ | +| `prunning_algorithms/` | Folder containing pruning techniques implementations. | +| `prunning_algorithms/inouts.py` | Contains the implementation of `GetInputs` and `GetOutputs` example pruning methods. | +| `prunning_algorithms/probprun.py` | Contains the implementation of a pseudo Probabilistic Pruning method. `GetOneNode` is a python generator. It will retrieve one node to delete each time it is called. | +| `ml_algorithms/` | Folder containing ML techniques implementations. | +| `ml_algorithms/decision_tree` | Contains the implementation of the Decision Tree technique through the `DecisionTreeCircuit` class. | +| `templates/` | Folder containing some libraries and scripts used for synthesis. | +| `templates/NanGate15nm.lib` | Technology file from Nangate. | +| `templates/NanGate15nm.v` | | +| `synth.ys` | Script to synthesize a circuit using yosys. | +| `__main__.py` | It executes the tool using the arguments from the command line. | +| `configuration.py` | Contains a configuration class for executing an ALS flow. Used by the CLI, but can be used by other scripts to do executions programatically without delving into the library's details. | +| `runner.py` | Contains a `run` method which accepts a configuration class in order to execute one of the ALS methods. | +| `circuit.py` | Object that represents a circuit as a XML tree. Receives a rtl and a library in order to build the circuit and be able to simulate it. | +| `circuiterror.py` | Compares two outputs and computes different error metrics. | +| `demo.py` | This file is a complete example of how the library should be used. | +| `netlist.py` | This class parses, extracts and represents the circuit from rtl into an object understandable by python. | +| `synthesis.py` | Executes the synthesis script (in our case yosys) and clean the intermediate files generated. At the end returns the path of the netlist. | +| `technology.py` | This class parses, extracts and represents the technology library file into an object understandable by python. | +| `utils.py` | Some functions not related with any other class but useful. | +| `test.py` | Currently unused file, meant to be used for unit tests. Out of date. | diff --git a/__main__.py b/__main__.py index a7b891a..5356666 100644 --- a/__main__.py +++ b/__main__.py @@ -1,56 +1,241 @@ +import argparse +import os +from circuit import Circuit +from configuration import ApproxSynthesisConfig, AlsMethod, Metric +from runner import run -from os import path -import sys - - -CONFIG_FILE = "poisonoak/poisonoak.config" -HELP_FILE = "poisonoak/poisonoak.help" - - -def trim(text): - ''' - Remove the spaces between text - - Parameters - ---------- - text : string - text with white spaces - - Returns - ------- - string - text without white spaces - ''' - return text.replace(" ", "").replace("\n","") - - -def read_config(): - ''' - Read the config file to check everything is there - - Returns - ------- - boolean - True if all the variables are defined and correct - ''' - if (path.exists(CONFIG_FILE)): - with open(CONFIG_FILE) as config: - for line in config.readlines(): - if len(line) > 10: - var, value = trim(line).split("=") - if var == "RTL" and path.exists(value): - print (value, "CHECK") - else: - print(f"Option {var} is incorrect") - return False - return True - else: - print("Config File does not exists!") - - -for arg in sys.argv: - if (arg == "-h" or arg == "--help"): - with open(HELP_FILE) as help: - print(help.read()) - else: - read_config() +# The tech library is hardcoded for the following reasons: +# - Ease of use: This way users don't have to provide a tech library which most +# of the time would be this same one. +# - Tool limitations: +# - AxLS only provides this tech library. +# - The Circuit class accepts a tech library "name" and assumes a .v and .lib +# files by that name in the templates/ directory of AxLS exist. +# - If we want to let users provide custom tech libraries through optional +# flags we'll need to make Circuit accept arbitrary paths to the needed tech +# files. +TECH = "NanGate15nm" + + +def parse_generate(value): + try: + return int(value) + except ValueError: + try: + return float(value) + except ValueError: + raise argparse.ArgumentTypeError( + f"Invalid generate_dataset value: {value}. Must be int or float." + ) + +def parse_positive_integer(value): + try: + n = int(value) + except ValueError: + raise argparse.ArgumentTypeError( + "Invalid value: {value}. Must be a positive integer." + ) + if n < 1: + raise argparse.ArgumentTypeError(f"Invalid value: {value}. Should be a positive integer, x >= 1.") + return n + + + +def main(): + parser = argparse.ArgumentParser( + description="AxLS CLI. Provides a simplified interface to the package's functionality." + ) + + subparsers = parser.add_subparsers( + title="subcommands", dest="subcommand", required=True + ) + + run_parser = subparsers.add_parser( + "run", + help="Run Approximate Logic Synthesis on a circuit using one of the provided methods.", + ) + + run_arguments(run_parser) + + generate_parser = subparsers.add_parser( + "generate", help="Generate a dataset that can be used with the 'run' command." + ) + + generate_arguments(generate_parser) + + args = parser.parse_args() + + if args.subcommand == "run": + metrics: list[str] = args.metrics + if len(metrics) == 0: + metrics = [Metric.MEAN_RELATIVE_ERROR_DISTANCE, Metric.ALS_TIME] + + if not os.path.isfile(args.circuit): + parser.error( + f"The path given for the circuit '{args.circuit}' does not exist." + ) + + try: + saif = args.saif or "" + + circuit = Circuit(args.circuit, TECH, saif) + config = ApproxSynthesisConfig( + method=args.method, + circuit=circuit, + metrics=args.metrics, + dataset=args.dataset, + resynthesis=args.resynthesis, + error=args.error, + validation=args.validation, + max_iters=args.max_iters, + prunes_per_iteration=args.prunes_per_iteration, + max_depth=args.max_depth, + one_tree_per_output=args.one_tree_per_output, + show_progress=args.show_progress, + csv=args.csv, + ) + except ValueError as e: + parser.error(str(e)) + + print("Configuration loaded successfully") + print(config) + + (results, validation_results) = run(config) + + print("\n---- Results -----") + for metric in config.metrics: + value = results[metric] + print(f"{metric.value}: {metric.to_user_friendly_display(value)}") + + if validation_results: + print("\n---- Results on Validation Set -----") + for metric, value in validation_results.items(): + print(f"{metric.value}: {metric.to_user_friendly_display(value)}") + + elif args.subcommand == "generate": + generate_dataset(args) + + +def run_arguments(run_parser): + """ + Adds the arguments to the 'run' subcomand parser + """ + + run_parser.add_argument( + "method", + type=str, + choices=[m.value for m in AlsMethod], + help="Approximation method.", + ) + run_parser.add_argument("circuit", help="Verilog circuit file.") + run_parser.add_argument("dataset", help="Dataset file to run simulations with.") + run_parser.add_argument( + "metrics", + nargs="*", + choices=[m.value for m in Metric], + # TODO: Add docs about what each metric is + help="Metrics to calculate, defaults to mred and time.", + ) + run_parser.add_argument( + "--saif", + help="SAIF file for the circuit. Used by 'probrun' method. If not provided, one will be generated during execution.", + ) + run_parser.add_argument( + "--resynthesis", action="store_true", help="If provided will use resynthesis." + ) + run_parser.add_argument( + "--error", + type=float, + help="Maximum error threshold to stop iterations. (0 < x <= 1). The error used is Mean Relative Error Distance.", + ) + run_parser.add_argument( + "--validation", + type=float, + help="Proportion of the dataset to allocate for validation (0 <= x < 1).", + ) + run_parser.add_argument( + "--max-iters", + type=int, + help="Maximum number of iterations for iterative methods.", + ) + run_parser.add_argument( + "--prunes-per-iteration", + type=parse_positive_integer, + default=1, + help="Number of prunes carried out each iteration. Affects pruning methods except ccarving since it already prunes multiple nodes at a time.", + ) + run_parser.add_argument( + "--max-depth", type=int, help="Max depth for decision_tree method" + ) + run_parser.add_argument( + "--one-tree-per-output", + action="store_true", + help="Use one tree per output for decision_tree", + ) + run_parser.add_argument( + "--show-progress", + action="store_true", + help="Show the progress of simulations executed for the ALS.", + ) + run_parser.add_argument( + "--csv", + type=str, + help="""Path to a file to save the output in csv format. + If the file doesn't exist, it will be created, if it exists it will be appended to. + + The output will be given as a single line with the following columns: + method, circuit, resynthesis, error, max_iters, max_depth, one_tree_per_output, metric1, metric2, ... + + - bool values are stored as "True" or "False". + - optional fields will just be left blank if not provided. + - if 'validation' flag is given, error metrics will be given as: metric1, v_metric1, metric2, v_metric2, ... + """, + ) + + +def generate_arguments(generate_parser): + """ + Adds the arguments to the 'generate' subcomand parser + """ + generate_parser.add_argument("circuit", help="Verilog circuit file.") + generate_parser.add_argument("dataset", help="Dataset file to generate.") + generate_parser.add_argument( + "--distribution", + default="uniform", + choices=["uniform", "gaussian", "triangular", "shuffle_bag"], + help=".", + ) + generate_parser.add_argument( + "size", + type=parse_generate, + help="""The size of the dataset. + Accepts an int (x > 0, a set amount of samples), or a float (0 < x <= 1, a of the total amount of inputs possible). + Note that for big circuits, like those with 32 input bits or more, generating a large fraction of the possible inputs might take a long time, due to the amount of possible inputs growing exponentially (2^n). + """, + ) + + +def generate_dataset(args: argparse.Namespace): + circuit = Circuit(args.circuit, TECH) + + size = args.size + if isinstance(size, int): + if not size > 0: + raise argparse.ArgumentTypeError( + f"Dataset size must be greater than 0: {size}" + ) + + if isinstance(size, float): + if not (0 < size <= 1.0): + raise argparse.ArgumentTypeError( + f"Dataset size as a percentage of total inputs must be between 0 < x <= 1.0: {size}" + ) + + max_inputs = 2 ** (len(circuit.inputs)) + size = round(max_inputs * size) + + circuit.generate_dataset(args.dataset, size, args.distribution) + + +if __name__ == "__main__": + main() diff --git a/barcas.py b/barcas.py deleted file mode 100644 index 5f7da2d..0000000 --- a/barcas.py +++ /dev/null @@ -1,80 +0,0 @@ -from copy import deepcopy -from circuit import Circuit -from pruning_algorithms.inouts import GetInputs, GetOutputs - -BASE = "circuits/ripple.carry.4b/" -TOP = "RCA_3_0" -MET = "wce" - -RTL = f"{BASE}{TOP}.v" -TB = f"{BASE}{TOP}_tb.v" -SAIF = f"{BASE}{TOP}.saif" -ORIG = f"{BASE}output0.txt" -APPR = f"{BASE}output.txt" - -def log (msg): - with open(f"{BASE}log.txt", "a+") as f: - f.write(msg) - print(msg) - -def barcas(circuit, max_error): - - log(f"Pruning circuit for Max Error of: {max_error}\n") - - actual_error = 0 - - last_stable_circuit = deepcopy(circuit) - modified_circuit = deepcopy(circuit) - - for bit in range (0, 4): - - for type in ["i","o"]: - - if type == "i": - inputs = [f"in1[{bit}]",f"in2[{bit}]"] - nodes = GetInputs(modified_circuit.netl_root, inputs) - else: - outputs = [f"out[{bit}]"] - nodes = GetOutputs(modified_circuit.netl_root, outputs) - - #print(nodes) - - for node in nodes: - modified_circuit.delete(node.attrib["var"]) - - obtained_error = modified_circuit.simulate(TB, MET, ORIG, APPR) - - nvar = node.attrib["var"]; - - msg = f"Node Deleted: {nvar}, error({MET}): {obtained_error}\n" - - log(msg); - - if obtained_error <= max_error: - last_stable_circuit.delete(node.attrib["var"]) - actual_error = obtained_error - else: - modified_circuit.undodelete(node.attrib["var"]) - - - if (actual_error == max_error): - break - - final_error = last_stable_circuit.simulate(TB, MET, ORIG, APPR, clean=False) - - last_stable_circuit.show(show_deletes=True) - input("Press enter...") - - msg = f"[FINAL] Expected: {max_error}, Obtained: {final_error}\n" - log(msg) - -our_circuit = Circuit(RTL, "NanGate15nm") - -for error in [8]: #range (10, 101, 10): - - our_circuit.exact_output(TB) - barcas(our_circuit, error) - ''' - x = threading.Thread(target=barcas, args=(our_circuit, error,)) - x.start() - ''' diff --git a/circuit.py b/circuit.py index 7e7ab2c..c206649 100644 --- a/circuit.py +++ b/circuit.py @@ -1,9 +1,9 @@ import os import re +import datetime from graphviz import Digraph from os import path, remove, system, rename -from random import randint from re import findall import xml.etree.ElementTree as ET @@ -37,7 +37,7 @@ class Circuit: ''' - def __init__(self, rtl, tech, saif = ""): + def __init__(self, rtl, tech, saif = "", topmodule = None): ''' Parse a rtl circuit into a xml tree using a specific technology library @@ -49,14 +49,21 @@ def __init__(self, rtl, tech, saif = ""): path to the technology file saif : string path to the saif file + topmodule : string (optional) + name of the circuit module that we want to synthesize, if not + provided it will be inferred from the rtl filename ''' self.rtl_file = rtl self.tech_file = tech - self.topmodule = rtl.split('/')[-1].replace(".v","") - self.netl_file = synthesis (rtl, tech, self.topmodule) - self.technology = Technology(tech) + + if not topmodule: + topmodule = rtl.split('/')[-1].replace(".v","") + + self.topmodule = topmodule + self.netl_file = synthesis (rtl, self.tech_file, self.topmodule) + self.technology = Technology(self.tech_file) # extract the usefull attributes of netlist netlist = Netlist(self.netl_file, self.technology) self.netl_root = netlist.root @@ -138,10 +145,10 @@ def is_node_deletable(self, node): Returns true if a node can be deleted, returns false if the node should be assigned a constant instead. - A node can be deleted if all its children nodes will be deleted as - well. If a node has children nodes or connects directly to an output of - the circuit, then the funcction will return false and the node should - be replaced with a constant. + A node can be deleted only if all its child nodes are also being + deleted. If the node has children, is connected directly to a circuit + output, or is itself a circuit output, the function returns false and + the node should be replaced with a constant. Parameters ---------- @@ -174,7 +181,9 @@ def is_node_deletable(self, node): some_children_not_deleted = len(node_children_to_be_deleted) < len(node_children) node_has_outputs = connects_to_output or some_children_not_deleted - node_can_be_deleted = not node_has_outputs + node_is_output = wire in self.outputs + + node_can_be_deleted = not node_has_outputs and not node_is_output return node_can_be_deleted @@ -274,15 +283,15 @@ def get_wires_to_be_deleted(self): - def write_to_disk (self, filename=""): + def write_to_disk (self, filepath): ''' Write the xml circuit into a netlist file considering the nodes to be deleted (marked with an attribute delete='yes') - Returns - ------- - string - path of the recently created netlist + Parameters + ---------- + filepath: string + Full file path to the generated file. ''' def format_io(node, io): @@ -292,9 +301,6 @@ def format_io(node, io): nodes_to_delete = self.get_nodes_to_delete() to_be_deleted, to_be_assigned = self.get_wires_to_be_deleted() - filename = filename if filename != "" else str(randint(9999,999999)) - filepath = f"{self.output_folder}{path.sep}{filename}.v" - with open(filepath, 'w') as netlist_file: def writeln(file, text): @@ -310,15 +316,15 @@ def writeln(file, text): for wire in self.get_circuit_wires(): if wire not in to_be_deleted: writeln(netlist_file, f"\twire {wire};") - used_outputs=[] + used_ports=[] for output in self.raw_outputs: - if output not in used_outputs: + if output not in used_ports: writeln(netlist_file, "\t" + output) - used_outputs.append(output) - for output in self.raw_inputs: - if output not in used_outputs: - writeln(netlist_file, "\t" + output) - used_outputs.append(output) + used_ports.append(output) + for input in self.raw_inputs: + if input not in used_ports: + writeln(netlist_file, "\t" + input) + used_ports.append(input) for node_var in self.get_circuit_nodes(): if node_var not in nodes_to_delete: @@ -339,7 +345,6 @@ def writeln(file, text): writeln(netlist_file, assign) writeln(netlist_file, "endmodule") - return filepath def show (self, filename=None, show_deletes=False, view=True, format="png"): @@ -493,25 +498,27 @@ def exact_output (self, testbench, output_file): ''' - name = get_name(5) - rtl = self.write_to_disk(name) + rtl = f"{self.output_folder}/{get_name(5)}.v" + self.write_to_disk(rtl) top = self.topmodule current_dir=os.path.dirname(__file__) - tech = f"{current_dir}/templates/" + self.tech_file - out = self.output_folder + tech = f"{current_dir}/templates/{self.tech_file}" - """Better to temporarily change cwd when executing iverilog""" - cwd=os.getcwd() - os.chdir(current_dir) + # Executable is ran from the testbench folder, because the path to the + # dataset is relative to the testbench file. + testbench = os.path.abspath(testbench) + out = os.path.dirname(testbench) # - - - - - - - - - - - - - - - Execute icarus - - - - - - - - - - - - - # iverilog -l tech.v -o executable testbench.v netlist.v - kon = f"iverilog -l \"{tech}.v\" -o \"{out}/{top}\" {testbench} \"{rtl}\"" + kon = f"iverilog -l \"{tech}.v\" -o \"{out}/{top}\" \"{testbench}\" \"{rtl}\"" system(kon) # - - - - - - - - - - - - - Execute the testbench - - - - - - - - - - - - system(f"cd \"{out}\"; ./{top}") + cwd=os.getcwd() + os.chdir(out) + system(f"./{top}") os.chdir(cwd) @@ -520,9 +527,52 @@ def exact_output (self, testbench, output_file): rename(out + "/output.txt", output_file) - return - def simulate_and_compute_error (self, testbench, metric, exact_output, new_output): + def simulate(self, testbench, approximate_output): + """ + Simulates the circuit tree with deletions. + Creates an executable using icarus, end then execute it to obtain the + output of the testbench + + Parameters + ---------- + testbench : string + path to the testbench file + approximate_output : string + Path to the output file where simulation results will be written. + The user must provide the full file path and name. If the file + exists, it will be overwritten. + """ + rtl = f"{self.output_folder}/{get_name(5)}.v" + self.write_to_disk(rtl) + + top = self.topmodule + current_dir = os.path.dirname(__file__) + tech = f"{current_dir}/templates/{self.tech_file}" + + # Executable is ran from the testbench folder, because the path to the + # dataset is relative to the testbench file. + testbench = os.path.abspath(testbench) + out = os.path.dirname(testbench) + + # - - - - - - - - - - - - - - - Execute icarus - - - - - - - - - - - - - + # iverilog -l tech.v -o executable testbench.v netlist.v + kon = f'iverilog -l "{tech}.v" -o "{out}/{top}" "{testbench}" "{rtl}"' + system(kon) + + # - - - - - - - - - - - - - Execute the testbench - - - - - - - - - - - + cwd = os.getcwd() + os.chdir(out) + system(f"./{top}") + + os.chdir(cwd) + + remove(rtl) + remove(f"{out}/{top}") + + rename(out + "/output.txt", approximate_output) + + def simulate_and_compute_error (self, testbench, exact_output, new_output, metric): ''' Simulates the actual circuit tree (with deletions) Creates an executable using icarus, end then execute it to obtain the @@ -532,9 +582,6 @@ def simulate_and_compute_error (self, testbench, metric, exact_output, new_outpu ---------- testbench : string path to the testbench file - metric : string - equation to compute the error - options med, wce, wcre,mred, msed exact_output : string Path to the output file of the original exact circuit to compare against. This file can be created with the `exact_output` method. @@ -542,44 +589,19 @@ def simulate_and_compute_error (self, testbench, metric, exact_output, new_outpu Path to the output file where simulation results will be written. The user must provide the full file path and name. If the file exists, it will be overwritten. - clean : bool - if true, deletes all the generated files + metric : string + equation to compute the error + options med, wce, wcre,mred, msed Returns ------- float error of the current circuit tree ''' - - - name = get_name(5) - rtl = self.write_to_disk(name) - - top = self.topmodule - tech = "./templates/" + self.tech_file - out = self.output_folder - - """Better to temporarily change cwd when executing iverilog""" - cwd=os.getcwd() - current_dir=os.path.dirname(__file__) - os.chdir(current_dir) - - # - - - - - - - - - - - - - - - Execute icarus - - - - - - - - - - - - - - # iverilog -l tech.v -o executable testbench.v netlist.v - kon = f"iverilog -l \"{tech}.v\" -o \"{out}/{top}\" {testbench} \"{rtl}\"" - system(kon) - - # - - - - - - - - - - - - - Execute the testbench - - - - - - - - - - - - system(f"cd \"{out}\"; ./{top}") - os.chdir(cwd) - - rename(out + "/output.txt", new_output) + self.simulate(testbench, new_output) error = compute_error(metric, exact_output, new_output) - remove(rtl) - remove(f"{out}/{top}") - return error def generate_dataset(self, filename, samples, distribution='uniform', **kwargs): @@ -603,8 +625,27 @@ def generate_dataset(self, filename, samples, distribution='uniform', **kwargs): "gaussian" or "normal" for a normal distribution. "uniform" or "rectangular" for a uniform distribution. "triangular" for a triangular distribution. + "shuffle_bag": It's "uniform-like", but avoids repeating values + until the full dataset has been used, employing a + shuffle bag algorithm. TODO: Add more distributions + shuffle_bag WARNING ⚠️: + ----------------------- + This mode generates a complete list of all possible input + combinations in memory, then shuffles and samples from it. It + guarantees no repeats, but is very memory intensive. + + For circuits with: + - 32 inputs: needs ~4.3 billion entries (~137GB RAM) + - 16 inputs: only ~65,536 entries (~2MB RAM) + + This is regardless of how many samples you're actually + grabbing! Even if you only grab 1 sample the full dataset will + be instantiated. + + Use only for small circuits (preferably under 16 inputs). + **kwargs: (optional) median: int @@ -639,18 +680,40 @@ def generate_dataset(self, filename, samples, distribution='uniform', **kwargs): else: inputs_info[name]=1 + format=f'0{bitwidth}b' if format=='b' else format #ensure right number of bits if binary + '''Iterate inputs''' + if distribution == "shuffle_bag": + # Shuffle bag needs to generate all the inputs together to ensure + # avoiding repetition of the circuit's inputs as a whole. + # + # This means that if the circuit has 2 inputs of 4 bits, we don't + # want to generate all possible 4 bit combinations for each input. + # We want to generate all possible 8 bit combinations and then split + # those into 2 4 bit inputs. + total_bits = sum(inputs_info.values()) + inputs = get_random(total_bits, distribution, samples, **kwargs) + for input in inputs: + shift_right = total_bits + row = [] + for bitwidth in inputs_info.values(): + shift_right -= bitwidth + mask = (1<< bitwidth)-1 + value = (input >> shift_right) & mask + row.append(f'{value:{format}}') + data.append(row) + + else: + for bitwidth in inputs_info.values(): + rows=get_random(bitwidth,distribution,samples, **kwargs) + data.append([f'{i:{format}}' for i in rows]) + data=list(zip(*data)) # Transpose data see: https://stackoverflow.com/questions/10169919/python-matrix-transpose-and-zip - for bitwidth in inputs_info.values(): - rows=get_random(bitwidth,distribution,samples, **kwargs) - format=f'0{bitwidth}b' if format=='b' else format #ensure right number of bits if binary - data.append([f'{i:{format}}' for i in rows]) - data=list(zip(*data)) # Transpose data see: https://stackoverflow.com/questions/10169919/python-matrix-transpose-and-zip np.savetxt(filename,data,fmt='%s') return - def write_tb(self, filename, dataset_file, iterations=None, timescale= '10ns / 1ps', delay=10, format='h', dump_vcd=False): + def write_tb(self, filename, dataset_file, iterations=None, timescale= '10ns / 1ps', delay=10, format='h', dump_vcd=None, show_progress=True): ''' Writes a basic testbench for the circuit. @@ -675,6 +738,11 @@ def write_tb(self, filename, dataset_file, iterations=None, timescale= '10ns / 1 'o' for octal 'd' for decimal 'b' for binary + show_progress: bool, default = True + Whether the testbench should print its progress as it executes. + dump_vcd (optional): str + If provided, executing the testbench will create a vcd file at the + given path. Returns ------- @@ -751,9 +819,14 @@ def write_tb(self, filename, dataset_file, iterations=None, timescale= '10ns / 1 f'\n' \ '''Initial statement''' - text= f'{text}initial begin\n $display("-- Beginning Simulation --");\n\n' + text= f'{text}initial begin\n' + + if show_progress: + text += '$display("-- Beginning Simulation --");\n\n' + if dump_vcd: - text=f'{text} $dumpfile("./{self.topmodule}.vcd");\n' \ + relative_vcd_path = os.path.relpath(dump_vcd, start=os.path.dirname(filename)) + text=f'{text} $dumpfile("{relative_vcd_path}");\n' \ f' $dumpvars(0,{self.topmodule}_tb);\n' relative_dataset_path = os.path.relpath(dataset_file, start=os.path.dirname(filename)) @@ -774,13 +847,16 @@ def write_tb(self, filename, dataset_file, iterations=None, timescale= '10ns / 1 f' #{delay}\n' \ f' $fwrite(file, "' for o in range(len(outputs_info.keys())): - text=f'{text}%d\\n ' - text=f'{text}",' - for o in list(outputs_info.keys())[::-1][0:-1]: + text=f'{text}%d ' + text=f'{text}\\n",' + for o in list(outputs_info.keys())[0:-1]: text= f'{text}{o},' - text= f'{text}{list(outputs_info.keys())[0]});\n'\ - + f' $display("-- Progress: %d/{iterations} --",i+1);\n'\ - f' end\n' \ + text= f'{text}{list(outputs_info.keys())[-1]});\n' + + if show_progress: + text +=f' $display("-- Progress: %d/{iterations} --",i+1);\n' + + text = f'{text}end\n' \ f' $fclose(file);\n' \ f' $fclose(mem);\n' \ f' $finish;\n' \ @@ -793,6 +869,284 @@ def write_tb(self, filename, dataset_file, iterations=None, timescale= '10ns / 1 return + def generate_saif_from_vcd( + self, saif: str, vcd_file_path: str, verbose: bool = False + ) -> None: + """ + Generates a SAIF file from a vcd file. A vcd file can be created by + running a simulation with a testbench that was created by created by + `write_tb` with a `dump_vcd` parameter. + + The SAIF file is then parsed and the netlist annotated with execution + data. + + Parameters + ---------- + saif: string + Path to the saif file generated. + vcd_file_path: string + Path to the vcd file. + The user must provide the full file path and name. If the file + exists, it will be overwritten. + verbose: bool + Whether to print verbose output + """ + saifversion = "2.0" + direction = "backward" + design = self.topmodule + vendor = "AxPy Inc" + program_name = "open_vcd2saif" + version = "v0" + divider = "/ " + timescale = "1 ps" + + # 1st pass: get variables + var_list = [] + level = 0 + + count = 0 + total = 0 + + def file_read(filename): + for row in open(filename, "r"): + yield row.split("\n")[0] + + vcd_file = file_read(vcd_file_path) + + for line in vcd_file: + search = re.search(r"\$scope", line) + if search is not None: + ls = line.split() + parent = ls[2] + level += 1 + continue + + search = re.search(r"\$upscope", line) + if search is not None: + level -= 1 + continue + + search = re.search(r"\$var", line) + if search is not None: + ls = line.split() + name = ls[4] + alias = ls[3] + var_len = int(ls[2]) + m = re.findall(r"\d+", ls[5]) + flag_mult = 0 + if len(m) == 2: + n0 = int(m[1]) + flag_mult = 1 + elif len(m) == 1: + n0 = int(m[0]) + flag_mult = 1 + else: + n0 = 1 + + if var_len == 1: + var_list.append( + { + "name": name, + "alias": alias, + "parent": parent, + "level": level, + "len": 1, + "bit_index": n0, + "multi_bit": flag_mult, + "high": 0, + "low": 0, + "x": 0, + "ig": 0, + "last": "2", + "toggle": 0, + } + ) + else: + for i in range(var_len): + var_list.append( + { + "name": name, + "alias": alias, + "parent": parent, + "level": level, + "len": var_len, + "bit_index": i, + "multi_bit": flag_mult, + "high": 0, + "low": 0, + "x": 0, + "ig": 0, + "last": "2", + "toggle": 0, + } + ) + continue + if verbose: + count += 1 + print(f"Pass #1: {count}/{total}") + + # 2nd pass: get values + time_step = 0 + last_step = 0 + + count = 0 + vcd_file = file_read(vcd_file_path) + + for line in vcd_file: + if line != "": + if line[0] == "#": + time_step = int(line[1:]) + + # print('Time step: %d' % time_step) + time_diff = time_step - last_step + for var in var_list: + if var["last"] == "1": + var["high"] += time_diff + elif var["last"] == "0": + var["low"] += time_diff + elif var["last"] == "x": + var["x"] += time_diff + last_step = time_step + + elif line[0] == "b" and line[1] != "x": + val, alias = line.split() + val_len = len(val[1:]) + + bit_index = val_len - 1 + for bit_char in val[1:]: + bit_val = bit_char + for var in var_list: + if alias == var["alias"]: + templateSize = "{0:0%db}" % (var["len"]) + word = templateSize.format(int(val[1:], 2)) + rev_word = word[::-1] + if ( + var["last"] != "2" + and var["last"] != rev_word[var["bit_index"]] + ): + var["toggle"] += 1 + var["last"] = rev_word[var["bit_index"]] + + bit_index -= 1 + + elif line[0] == "0" or line[0] == "1" or line[0] == "x": + bit_val = line[0] + alias = line[1:] + for var in var_list: + if alias == var["alias"] and var["len"] == 1: + if var["last"] != "2" and var["last"] != bit_val: + var["toggle"] += 1 + var["last"] = bit_val + if verbose: + count += 1 + print(f"Pass #2: {count}/{total}") + + duration = time_step - 1 + # 3rd pass: write file + + text_level = 0 + level = 0 + + count = 0 + vcd_file = file_read(vcd_file_path) + + def get_time_stamp(): + now = datetime.datetime.now() + year = '{:02d}'.format(now.year) + month = '{:02d}'.format(now.month) + day = '{:02d}'.format(now.day) + hour = '{:02d}'.format(now.hour) + minute = '{:02d}'.format(now.minute) + second = '{:02d}'.format(now.second) + date_string = '{}-{}-{} {}:{}:{}'.format(month, day, year, hour, minute, second) + return date_string + + saifile = open(saif, "w") + + saifile.write("(SAIFILE\n") + saifile.write('(SAIFVERSION "%s")\n' % saifversion) + saifile.write('(DIRECTION "%s")\n' % direction) + saifile.write('(DESIGN "%s")\n' % design) + saifile.write('(DATE "%s")\n' % get_time_stamp()) + saifile.write('(VENDOR "%s")\n' % vendor) + saifile.write('(PROGRAM_NAME "%s")\n' % program_name) + saifile.write('(VERSION "%s")\n' % version) + saifile.write("(DIVIDER %s)\n" % divider) + saifile.write("(TIMESCALE %s)\n" % timescale) + saifile.write("(DURATION %ld)\n" % duration) + + def saif_indent_level(level): + space = '' + for _ in range(level): + space += ' ' + return space + + for line in vcd_file: + search = re.search(r"\$scope", line) + if search is not None: + ls = line.split() + name = ls[2] + saifile.write( + "%s(INSTANCE %s\n" % (saif_indent_level(text_level), name) + ) + text_level += 1 + level += 1 + saifile.write("%s(NET\n" % (saif_indent_level(text_level))) + text_level += 1 + + # put variables + for var in var_list: + if var["parent"] == name and var["level"] == level: + if var["multi_bit"] == 0: + saifile.write( + "%s(%s\n" % (saif_indent_level(text_level), var["name"]) + ) + else: + saifile.write( + "%s(%s\\[%d\\]\n" + % ( + saif_indent_level(text_level), + var["name"], + var["bit_index"], + ) + ) + + saifile.write( + "%s (T0 %d) (T1 %d) (TX %d)\n" + % ( + saif_indent_level(text_level), + var["low"], + var["high"], + var["x"], + ) + ) + + saifile.write( + "%s (TC %d) (IG %d)\n" + % (saif_indent_level(text_level), var["toggle"], var["ig"]) + ) + + saifile.write("%s)\n" % (saif_indent_level(text_level))) + + text_level -= 1 + saifile.write("%s)\n" % (saif_indent_level(text_level))) + continue + + search = re.search(r"\$upscope", line) + if search is not None: + text_level -= 1 + level -= 1 + saifile.write("%s)\n" % (saif_indent_level(text_level))) + + if verbose: + count += 1 + print(f"Pass #3: {count}/{total}") + + saifile.write(")\n") + saifile.close() + + self.saif_parser(saif) + def resynth(self): ''' Calls resynthesis function to reduce circuit structure using logic synthesis optimizations/mapping @@ -800,8 +1154,10 @@ def resynth(self): :return: path-like string path to resynthetized file ''' - name=get_name(5) - self.netl_file =resynthesis(self.write_to_disk(name),self.tech_file,self.topmodule) + rtl = f"{self.output_folder}/{get_name(5)}.v" + self.write_to_disk(rtl) + + self.netl_file =resynthesis(rtl,self.tech_file,self.topmodule) netlist = Netlist(self.netl_file, self.technology) self.netl_root = netlist.root @@ -811,7 +1167,7 @@ def resynth(self): self.raw_outputs = netlist.raw_outputs self.raw_parameters = netlist.raw_parameters - os.remove(f'{self.output_folder}/{name}.v') + os.remove(rtl) return self.netl_file @@ -825,9 +1181,10 @@ def get_area(self, method = 'yosys'): ''' if method == 'yosys': - name=get_name(5) - area=ys_get_area(self.write_to_disk(name),self.tech_file,self.topmodule) - os.remove(f'{self.output_folder}/{name}.v') + rtl = f"{self.output_folder}/{get_name(5)}.v" + self.write_to_disk(rtl) + area=ys_get_area(rtl,self.tech_file,self.topmodule) + os.remove(rtl) return area else: diff --git a/circuiterror.py b/circuiterror.py index 822b84c..6154498 100644 --- a/circuiterror.py +++ b/circuiterror.py @@ -25,15 +25,17 @@ def extract_numbers(filename): return result -def compute_error(metric, original, approximate): +def compute_error(metric, original, approximate) -> float: ''' Computes the error between two different testbench output files + Raises a ValueError if the metric is invalid + Parameters ---------- metric : string equation to measure the error - options med, wce, wcre,mred, msed + options hd, med, wce, mred, msed original : string path to the original results text file approximate : string @@ -49,7 +51,12 @@ def compute_error(metric, original, approximate): original_len = len(original_output) approx_len = len(approximate_output) - assert original_len == approx_len, f"The output of the original and the approximate simulations doesn't match: {original_len}!={approx_len}. Make sure both outputs are being generated correctly." + assert original_len == approx_len, f""" +The output of the original and the approximate simulations doesn't match: {original_len}!={approx_len}. +Make sure both outputs are being generated correctly. +Original output: {original} +Approximate output: {approximate} +""" # compute the error distance ED := |a - a'| error_distance = [abs(original_output[x] - approximate_output[x]) @@ -62,18 +69,14 @@ def compute_error(metric, original, approximate): 0 if original_output[x] == 0 else error_distance[x]/original_output[x] for x in range(0,len(original_output))] - # Error Rate: - if (metric == "er"): - return round(sum((error>0 for error in error_distance))/total,3) - # Mean Hamming Distance see: https://stackoverflow.com/questions/40875282/fastest-way-to-get-hamming-distance-for-integer-array if (metric == "hd"): - hamming_distance=np.bitwise_xor(original_output,approximate_output) + hamming_distance=np.bitwise_xor(original_output,approximate_output, dtype=object) hamming_distance=[f'{hd:b}'.count('1') for hd in hamming_distance] - return round(np.mean(hamming_distance),3) + return round(float(np.mean(hamming_distance)),3) # Mean Error Distance MED := sum { ED(bj,b) * pj } - if (metric == "med"): + elif (metric == "med"): mean_error = sum(error_distance) / len(error_distance) return round(mean_error,3) @@ -90,3 +93,6 @@ def compute_error(metric, original, approximate): elif (metric == "msed"): msed = sum(square_error_distance)/len(square_error_distance) return round(msed,3) + + else: + raise ValueError(f"Invalid metric: {metric}") diff --git a/configuration.py b/configuration.py new file mode 100644 index 0000000..566d488 --- /dev/null +++ b/configuration.py @@ -0,0 +1,451 @@ +import os + +from enum import Enum +from typing import override + +from circuit import Circuit + + +class AlsMethod(str, Enum): + CONSTANT_INPUTS = "inconst" + CONSTANT_OUTPUTS = "outconst" + PROBPRUN = "probprun" + SIGNIFICANCE = "significance" + CCARVING = "ccarving" + DECISION_TREE = "decision_tree" + + @override + def __repr__(self): + return self.value + + @override + def __str__(self): + return self.value + + +class Metric(str, Enum): + HAMMING_DISTANCE = "hd" + MEAN_ERROR_DISTANCE = "med" + WORST_CASE_ERROR = "wce" + MEAN_RELATIVE_ERROR_DISTANCE = "mred" + MEAN_SQUARED_ERROR_DISTANCE = "msed" + ALS_TIME = "time" + AREA = "area" + + def to_user_friendly_display(self, value: float) -> str: + """ + Formats the value to a user friendly string format for display. + For example, the AREA metric is a percentage value, so it's formatted as + such. + """ + match self: + # Percentage metrics + case Metric.MEAN_RELATIVE_ERROR_DISTANCE | Metric.AREA: + return f"{round(value * 100, 2)}%" + case Metric.ALS_TIME: + return f"{round(value, 2)} s" + # No special handling except rounding + case _: + return str(round(value, 2)) + + def is_error_metric(self) -> bool: + # This set should contain all the Metrics that are related to + # approximation errors. + return self in { + Metric.HAMMING_DISTANCE, + Metric.MEAN_ERROR_DISTANCE, + Metric.WORST_CASE_ERROR, + Metric.MEAN_RELATIVE_ERROR_DISTANCE, + Metric.MEAN_SQUARED_ERROR_DISTANCE, + } + + +# List of iterative methods. +_ITERATIVE_METHODS = [ + AlsMethod.CONSTANT_INPUTS, + AlsMethod.CONSTANT_OUTPUTS, + AlsMethod.PROBPRUN, + AlsMethod.SIGNIFICANCE, + AlsMethod.CCARVING, +] + + +class ApproxSynthesisConfig: + """ + Configuration class for running Approximate Logic Synthesis with different methods. + + Parameters + ---------- + method : AlsMethod | str + One of the supported methods. Can use the AlsMethod enum or one of the + following string names: 'inconst', 'outconst', 'probprun', + 'significance', 'ccarving', or 'decision_tree'. + + circuit : Circuit + A synthesized RTL circuit. See the circuit module. + + dataset : str + Path to the dataset file. + TODO: Document dataset file format. + + metrics : list[Metric | str] + Metrics to calculate for the execution. + The time metric is given in seconds, the area metric is given as a % of + the area of the original circuit. + + resynthesis : bool, default=False + Whether to use resynthesis. + + error : float (0 < x <= 1), optional + The maximum error threshold permitted. Required for iterative methods, + i.e. pruning methods. + + The error used is the Mean Relative Error Distance. + + validation : float (0 <= x < 1), optional + Specifies the proportion of the input dataset to be allocated to the + validation set. + + If provided, the dataset will be split into a validation set and a test + set. For example, a value of 0.2 means 20% of the dataset will be used + for validation, while 80% will be used for testing during ALS. This helps + assess whether the generated solution generalizes well to unseen circuit + inputs. + + A value of 0 indicates that the full dataset will be used for training, + similar to not providing this parameter. However, it can be useful when + generating csv data because the columns will be formatted to align with + other circuits that are using a validation set. + + max_iters : int, optional + Maximum amount of iterations to execute. Used in iterative methods, + i.e. pruning methods. + + prunes_per_iteration : int, default=1 + Number of pruning operations to perform per iteration during ALS. + Increasing this value can speed up pruning-based methods by reducing the + number of iterations. It doesn't affect ccarving since it already prunes + multiple nodes per iteration. If the resulting circuit exceeds the error + threshold, the algorithm backtracks to the last valid state before + continuing. If this parameter is too large, backtracking may take longer + than the initial search, especially for small circuits. + + max_depth : int, optional + Required for 'decision_tree'. + + one_tree_per_output : bool, default=False + Used only by 'decision_tree' method. + If True, uses a separate tree per output. + If False, uses a single multi-output tree. + + output_significances: list[int] + List of significances of the circuit outputs. Should match the number of + output bits of the circuit. + + If not provided a significance of 2**i will be assumed, where i is the + index of the output bit. (LSB has less significance that MSB.) + + Used by the 'significance' and 'ccarving' methods. + + show_progress : bool, default=False + Whether to show simulation progress. + + csv : str, optional + Path to a file for saving the output in CSV format. If the file does not + exist, it will be created with a header; if it exists, the output will be + appended. + + The output will be given as a single line with the following columns: + method, circuit, resynthesis, error, max_depth, one_tree_per_output, metric1, metric2, ... + + If the 'validation' option is given, the metrics will include validation results, formatted as: + + metric1, v_metric1, metric2, v_metric2, ... + + Where 'v_' indicates the metric's result on the validation set. This + applies only to error metrics. + + - bool values are stored as "True" or "False". + - optional fields (error, max_depth, one_tree_per_output) will + just be left blank if not provided. + """ + + # TODO: The csv option requires further thought/design. + # + # Currently, the configuration options included in the csv (resynthesis, + # error, max_depth and one_tree_per_output) were chosen arbitrarily given + # what was needed at the time when initially adding csv output; and are + # not necessarily more interesting in the general case than other options + # not included. + # Perhaps the configuration options included in the CSV should also be + # configurable, or we should include any options that are not None, or we + # should always include every single possible option and metric in the csv, + # even those not specified. + + method: AlsMethod + circuit: Circuit + dataset: str + metrics: list[Metric] + resynthesis: bool + error: float | None + validation: float | None + max_iters: int | None + prunes_per_iteration: int + max_depth: int | None + one_tree_per_output: bool + output_significances: list[int] | None + show_progress: bool + csv: str | None + + def __init__( + self, + method: AlsMethod | str, + circuit: Circuit, + dataset: str, + metrics: list[Metric | str], + resynthesis: bool = False, + error: float | None = None, + validation: float | None = None, + max_iters: int | None = None, + prunes_per_iteration: int = 1, + max_depth: int | None = None, + one_tree_per_output: bool = False, + output_significances: list[int] | None = None, + show_progress: bool = False, + csv: str | None = None, + ): + """ + Instantiate and validate an ApproxSynthesisConfig. + + Raises + ------ + ValueError + If required parameters are missing or invalid. + """ + self.method = _validate_method(method) + self.circuit = circuit + self.dataset = _validate_dataset(dataset) + self.metrics = _validate_metrics(metrics) + + self.resynthesis = resynthesis + self.error = _validate_error(error, self.method) + self.validation = _validate_validation(validation) + self.max_iters = max_iters + self.prunes_per_iteration = _validate_prunes_per_iteration(prunes_per_iteration) + + self.max_depth = _validate_max_depth(max_depth, self.method) + self.one_tree_per_output = one_tree_per_output + self.output_significances = _validate_output_significances( + self.circuit, output_significances + ) + self.show_progress = show_progress + self.csv = csv + + @override + def __repr__(self): + fields = ", ".join(f"{key}={value!r}" for key, value in self.__dict__.items()) + return f"{self.__class__.__name__}({fields})" + + def csv_columns(self) -> list[str]: + """ + Returns the names of the columns used if exporting this config's + execution to a CSV + """ + columns = [ + "method", + "circuit", + "resynthesis", + "error", + "max_depth", + "one_tree_per_output", + ] + for metric in self.metrics: + columns.append(metric.value) + if self.validation is not None and metric.is_error_metric(): + columns.append(f"v_{metric.value}") + + return columns + + def csv_values( + self, + results: dict[Metric, float], + validation_results: None | dict[Metric, float], + ) -> list[str]: + """ + Returns the values of the columns if exporting this config's execution to + a CSV row. + A Results dict must be provided, it is assumed it contains the results + for the metrics given to this config. + A validation Results dict can be provided, if it is, it's assumed it + contains the results for the error metrics provided to this config. + """ + values = [ + self.method.value, + self.circuit.topmodule, + self.resynthesis, + self.error, + self.max_depth, + self.one_tree_per_output, + ] + + for metric in self.metrics: + values.append(results[metric]) + if validation_results is not None and metric.is_error_metric(): + # We use `get` because maybe the metric might not be in the dict + # if the 'validation' option was given with a value of 0 + values.append(validation_results.get(metric, None)) + + stringified_values = [ + str(value) if value is not None else "" for value in values + ] + + return stringified_values + + +def _validate_method(method: AlsMethod | str) -> AlsMethod: + """ + Validates the synthesis method. + + If `method` is a string, tries to convert it to an AlsMethod enum. + Raises a ValueError if the method name is invalid. + + Ensures consistency for downstream logic by enforcing enum usage. + """ + try: + method = AlsMethod(method) + except ValueError: + available_methods = ", ".join([method.value for method in AlsMethod]) + raise ValueError( + f"{method} is not a valid {AlsMethod.__name__}. Available methods are: {available_methods}" + ) + + return method + + +def _validate_metrics(metrics: list[str | Metric]) -> list[Metric]: + """ + Validates the metrics. + + If a metric is given as a string, this functions tries to convert it to a + Metric enum. + Raises a ValueError if the metric name is invalid. + + Ensures consistency for downstream logic by enforcing enum usage. + """ + + result_metrics: list[Metric] = [] + + for metric in metrics: + try: + result_metrics.append(Metric(metric)) + except ValueError: + available_metrics = ", ".join([metric.value for metric in Metric]) + raise ValueError( + f"{metric} is not a valid {Metric.__name__}. Available metrics are: {available_metrics}" + ) + + return result_metrics + + +def _validate_dataset( + dataset: str, +) -> str: + """ + Ensures the dataset file exists. + TODO: We could maybe validate that the values match up with the circuit's inputs. + + Raises a ValueError if the dataset file is missing or doesn't match the + circuit inputs. + """ + if not os.path.isfile(dataset): + raise ValueError(f"Dataset file not found: {dataset}..") + + return dataset + + +def _validate_error( + error: float | None, + method: AlsMethod, +) -> float | None: + """ + Validates 'error'. + + Required for: + - all iterative methods + + Raises ValueError if missing in those cases. + """ + if method in _ITERATIVE_METHODS: + if error is None: + raise ValueError(f"'error' is required for method {method}") + + return error + + +def _validate_max_depth( + max_depth: int | None, + method: AlsMethod, +) -> int | None: + """ + Validates 'max_depth' for decision trees. + + Ensures it is provided for the 'decision_tree' method and aligns with + the number of circuit files if given as a list. + + Ensures values are valid: + - int: must be > 1 + + Raises ValueError if missing or mismatched. + """ + if method == AlsMethod.DECISION_TREE: + if max_depth is None: + raise ValueError(f"'max_depth' is required for method {method}.") + else: + if max_depth <= 1: + raise ValueError("max_depth must be > 1") + + return max_depth + + +def _validate_validation(validation: float | None) -> float | None: + """ + Validates the 'validation' parameter. + + Ensures that the value is a float within the range (0, 1]. + If the value is None, it is considered valid and returned as is. + + Raises ValueError if the value is not in the specified range. + """ + if validation is not None: + if not (0 <= validation < 1.0): + raise ValueError( + "'validation' value must be a float in the range 0 <= x < 1." + ) + + return validation + + +def _validate_output_significances( + circuit: Circuit, + output_significances: list[int] | None, +) -> list[int] | None: + """ + Validates 'output_significances'. + + Ensures that if provided it matches in length the circuit's outputs. + + Raises ValueError if mismatched. + """ + if output_significances is not None: + if len(output_significances) != len(circuit.outputs): + raise ValueError( + f"'output_significances' length ({len(output_significances)}) does not match the amount of circuit outputs ({len(circuit.outputs)})." + ) + + return output_significances + + +def _validate_prunes_per_iteration(prunes_per_iteration: int) -> int: + if prunes_per_iteration < 1: + raise ValueError("prunes_per_iteration must be at least 1.") + return prunes_per_iteration diff --git a/ml_algorithms/decision_tree.py b/ml_algorithms/decision_tree.py index e48062c..4a4395f 100644 --- a/ml_algorithms/decision_tree.py +++ b/ml_algorithms/decision_tree.py @@ -1,5 +1,4 @@ from collections import OrderedDict -from typing import List import numpy as np from sklearn.tree import DecisionTreeClassifier from sklearn.tree._tree import Tree @@ -42,19 +41,19 @@ class DecisionTreeCircuit: Useful parameters include but are not limited to: max_depth, """ - clf: DecisionTreeClassifier | List[DecisionTreeClassifier] + clf: DecisionTreeClassifier | list[DecisionTreeClassifier] one_tree_per_output: bool - inputs: List[CircuitVariable] - outputs: List[CircuitVariable] + inputs: list[CircuitVariable] + outputs: list[CircuitVariable] _trained: bool - circuit_inputs: List[str] - circuit_outputs: List[str] + circuit_inputs: list[str] + circuit_outputs: list[str] def __init__( self, - circuit_inputs: List[str], - circuit_outputs: List[str], + circuit_inputs: list[str], + circuit_outputs: list[str], one_tree_per_output=False, **kwargs, ): @@ -72,7 +71,7 @@ def __init__( else: self.clf = DecisionTreeClassifier(**kwargs) - def train(self, X: List[List[int]], y: List[List[int]]): + def train(self, X: list[list[int]], y: list[list[int]]): """Train the decision tree classifier(s) with the training set (X, y). Parameters @@ -120,7 +119,7 @@ def to_verilog_file(self, topmodule: str, output_file: str): raw_inputs = [ f"input {variable.name};" if variable.bits == 1 - else f"input [{variable.bits}:0] {variable.name};" + else f"input [{variable.bits - 1}:0] {variable.name};" for variable in self.inputs ] raw_outputs = [ @@ -153,7 +152,7 @@ def to_verilog_file(self, topmodule: str, output_file: str): f.write("endmodule\n") -def _to_binary(x: List[List[int]], bit_widths: List[int]): +def _to_binary(x: list[list[int]], bit_widths: list[int]): """Convert a list of lists of integers to a binary representation. This function takes a list input rows `x` and a list of bit widths @@ -169,9 +168,9 @@ def _to_binary(x: List[List[int]], bit_widths: List[int]): Parameters ---------- - x : List[List[int]] + x : list[list[int]] A list of lists of integers, where each inner list represents a row of input data. - bit_widths : List[int] + bit_widths : list[int] A list of integers, where each value represents the number of bits to use for the corresponding column in the input data. @@ -215,7 +214,7 @@ def _to_binary(x: List[List[int]], bit_widths: List[int]): return result -def _parse_circuit_variables(variable_list: List[str]): +def _parse_circuit_variables(variable_list: list[str]): """Parse a list of circuit variable names and bit widths. TODO: This function should be put in a common module to be used by future ML @@ -223,13 +222,13 @@ def _parse_circuit_variables(variable_list: List[str]): Parameters ---------- - input_list : List[str] + input_list : list[str] A list of strings representing circuit variables, where each variable can be either a single-bit variable (e.g., 'cin') or a multi-bit variable (e.g., 'in1[3]'). Returns ------- - List[CircuitVariable] + list[CircuitVariable] A list of `CircuitVariable` objects, where each object represents a circuit variable with a name and bit width. """ @@ -269,16 +268,17 @@ def _tree_2_equation( Returns ------- - str or None - A Boolean expression string for the subtree rooted at `node`, or None if + str or int + A Boolean expression string for the subtree rooted at `node`, or an int + if the output is constant for the subtree (0 or 1) the subtree always evaluates to 0. """ if tree.feature[node] == -2: # Leaf node result = tree.value[node][output].argmax() if result == 0: - return None + return 0 else: - return "LEAF_NODE_1" + return 1 else: # Internal node left_result = _tree_2_equation( @@ -294,24 +294,24 @@ def _tree_2_equation( negated_input = f"!{input}" match (left_result, right_result): - case (None, None): - return None + case (0, 0): + return 0 - case (None, "LEAF_NODE_1"): + case (0, 1): return input - case ("LEAF_NODE_1", None): + case (1, 0): return negated_input - case ("LEAF_NODE_1", "LEAF_NODE_1"): - return "LEAF_NODE_1" + case (1, 1): + return 1 - case (str(left), "LEAF_NODE_1"): + case (str(left), 1): return f"{input} | ({left})" - case ("LEAF_NODE_1", str(right)): + case (1, str(right)): return f"{negated_input} | ({right})" - case (str(left), None): + case (str(left), 0): return f"{negated_input} & ({left})" - case (None, str(right)): + case (0, str(right)): return f"{input} & ({right})" case (str(left), str(right)): return f"({negated_input} & ({left})) | ({input} & ({right}))" diff --git a/netlist.py b/netlist.py index f7f4c55..87ff499 100644 --- a/netlist.py +++ b/netlist.py @@ -52,13 +52,13 @@ def __init__(self, netl_file, technology): with open(netl_file, 'r') as circuit_file: content = circuit_file.read() - self.raw_outputs, self.circuit_outputs = self.get_outputs(content) - self.raw_inputs, self.circuit_inputs = self.get_inputs(content) - expreg = r'module [a-zA-Z0-9_]*\s*\(([\s\S]+?)\);' parameters = re.search(expreg,content) self.raw_parameters = re.sub('\n','',parameters.group(1)) + self.raw_outputs, self.circuit_outputs = self.get_outputs(content, self.raw_parameters) + self.raw_inputs, self.circuit_inputs = self.get_inputs(content, self.raw_parameters) + assigns = parse_assigns(content) for a in assigns: self.assignments.append(a) @@ -155,125 +155,110 @@ def to_xml(self): return root + def get_inputs(self, netlist_rtl, raw_parameters): + """ + Extracts the circuit's input variables. - def get_inputs(self, netlist_rtl): - ''' - Extracts the input variables of the circuit - TODO: support one bit variables + Inputs are returned in two ways: + - `circuit_inputs`: expanded, sorted MSB→LSB, and follow the order in `raw_parameters`. + - `raw_inputs`: unexpanded lines, sorted to match the same order. - The `circuit_inputs` will be returned from MSB -> LSB. This is important - to provide the inputs in the correct order to methods that map a circuit - representation to a Verilog format, like the Decision Tree method. + This ordering ensures compatibility with Verilog-generating methods like + the Decision Tree, which expect to receive bit-accurate and positionally + correct inputs in order to replicate them. + + TODO: support one bit variables Parameters ---------- - netlist_rtl : string - content of the netlist file + netlist_rtl : str + Content of the netlist file. + raw_parameters : str + Module's parameter list string. Returns ------- - array - list of circuit intputs - ''' + tuple[list[str], list[str]] + raw_inputs and circuit_inputs + """ raw_inputs = [] circuit_inputs = [] inputs = re.findall( - r'input\s*(\[([0-9]*):([0-9]*)\])*\s*([a-zA-Z0-9]*)',netlist_rtl) + r"input\s*(\[([0-9]*):([0-9]*)\])*\s*([a-zA-Z0-9]*)", netlist_rtl + ) for i in inputs: - if i[0] != '': + if i[0] != "": left = int(i[1]) right = int(i[2]) - if (left > right): - for x in range(left, right-1, -1): - circuit_inputs.append(i[3]+'['+str(x)+']') + if left > right: + for x in range(left, right - 1, -1): + circuit_inputs.append(i[3] + "[" + str(x) + "]") else: - for x in range(left,right+1): - circuit_inputs.append(i[3]+'['+str(x)+']') + for x in range(left, right + 1): + circuit_inputs.append(i[3] + "[" + str(x) + "]") raw_inputs.append(f"input [{i[1]}:{i[2]}] {i[3]};") else: circuit_inputs.append(f"{i[3]}") raw_inputs.append(f"input {i[3]};") + + circuit_inputs = sort_expanded_vars(circuit_inputs, raw_parameters) + raw_inputs = sort_raw_vars(raw_inputs, raw_parameters) return raw_inputs, circuit_inputs + def get_outputs(self, netlist_rtl, raw_parameters): + """ + Extracts the circuit's output variables. - def get_outputs(self, netlist_rtl): - ''' - Extracts the output variables of the circuit - TODO: support one bit variables + Outputs are returned in two ways: + - `circuit_outputs`: expanded, sorted MSB→LSB, and follow the order in `raw_parameters`. + - `raw_outputs`: unexpanded lines, sorted to match the same order. - The `circuit_outputs` will be returned from MSB -> LSB. This is - important to provide the outputs in the correct order to methods that - map a circuit representation to a Verilog format, like the Decision Tree - method. + This ordering ensures compatibility with Verilog-generating methods like + the Decision Tree, which expect to receive bit-accurate and positionally + correct inputs in order to replicate them. + + TODO: support one bit variables Parameters ---------- netlist_rtl : string content of the netlist file + raw_parameters : str + Module's parameter list string. + Returns ------- - array - list of circuit outputs - ''' + tuple[list[str], list[str]] + raw_outputs and circuit_outputs + """ raw_outputs = [] circuit_outputs = [] outputs = re.findall( - r'output\s*(\[([0-9]*):([0-9]*)\])*\s*([a-zA-Z0-9]*)',netlist_rtl) + r"output\s*(\[([0-9]*):([0-9]*)\])*\s*([a-zA-Z0-9]*)", netlist_rtl + ) for o in outputs: - if o[0] != '': + if o[0] != "": left = int(o[1]) right = int(o[2]) - if (left > right): - for x in range(left, right-1, -1): + if left > right: + for x in range(left, right - 1, -1): circuit_outputs.append(f"{o[3]}[{str(x)}]") else: - for x in range(left,right+1): + for x in range(left, right + 1): circuit_outputs.append(f"{o[3]}[{str(x)}]") raw_outputs.append(f"output [{o[1]}:{o[2]}] {o[3]};") else: circuit_outputs.append(f"{o[3]}") raw_outputs.append(f"output {o[3]};") - return raw_outputs, circuit_outputs - -def expand_range(name): - ''' - Expands a Verilog-style bit range expression into a list of individual bits. - - Parameters - ---------- - name : string - A string like "a[3:0]" or "b[7]". - - Returns - ------- - List[string] - A list of strings like ["a[3]", "a[2]", "a[1]", "a[0]"]. - - Examples - ------- - >>> expand_range("a[3:1]") - ['a[3]', 'a[2]', 'a[1]'] - >>> expand_range("x[1:3]") - ['x[1]', 'x[2]', 'x[3]'] + circuit_outputs = sort_expanded_vars(circuit_outputs, raw_parameters) + raw_outputs = sort_raw_vars(raw_outputs, raw_parameters) + return raw_outputs, circuit_outputs - >>> expand_range("y[5]") - ['y[5]'] - - >>> expand_range("z") - ['z'] - ''' - m = re.match(r'(\w+)\[(\d+):(\d+)\]', name) - if not m: - return [name] - var, hi, lo = m.groups() - hi, lo = int(hi), int(lo) - step = -1 if hi > lo else 1 - return [f"{var}[{i}]" for i in range(hi, lo + step, step)] def expand_concat(expr): - ''' + """ Expands a Verilog-style concatenation expression into a flat list of individual bits. @@ -284,7 +269,7 @@ def expand_concat(expr): Returns ------- - List[string] + list[string] A list of strings like ["a[3]", "a[2]", "a[1]", "a[0]", "b[1]", "c"]. Examples @@ -300,11 +285,11 @@ def expand_concat(expr): >>> expand_concat("b[1:0]") ['b[1]', 'b[0]'] - ''' + """ expr = expr.strip() - if expr.startswith('{') and expr.endswith('}'): + if expr.startswith("{") and expr.endswith("}"): inner = expr[1:-1] - parts = [p.strip() for p in inner.split(',')] + parts = [p.strip() for p in inner.split(",")] bits = [] for p in parts: bits.extend(expand_range(p)) @@ -312,6 +297,100 @@ def expand_concat(expr): else: return expand_range(expr) + +def expand_range(expr): + """ + Expands a Verilog-style range expression into a flat list of individual bits + or constants. + + Parameters + ---------- + name : string + A Verilog signal, range or constant + + Returns + ------- + list[string] + A list of strings or bits. + + Examples + ------- + >>> expand_range("a[3:1]") + ['a[3]', 'a[2]', 'a[1]'] + + >>> expand_range("x[1:3]") + ['x[1]', 'x[2]', 'x[3]'] + + >>> expand_range("y[5]") + ['y[5]'] + + >>> expand_range("z") + ['z'] + + >>> expand_range("4'hd") + [1, 1, 0, 1] + """ + expr = expr.strip() + if "'" in expr: + return expand_constant(expr) + elif "[" in expr: + if ":" in expr: + base, range_part = expr.split("[") + range_part = range_part[:-1] + start, end = map(int, range_part.split(":")) + step = -1 if start > end else 1 + return [f"{base}[{i}]" for i in range(start, end + step, step)] + else: + return [expr] + else: + return [expr] + + +def expand_constant(expr): + """ + Expands a Verilog-style constant into a flat list of individual bits. + + Parameters + ---------- + expr : string + A Verilog constant like "3'h6" or "4'd13". Only hexadecimal ('h) and + decimal ('d) formats are supported. + TODO: Support other bases like binary ('b) or octal ('o). Support for + these hasn't been added because we haven't run into a scenario where + yosys assigns a constant with these bases. + + Returns + ------- + list[int] + A list of bits like [1, 1, 0]. + + Examples + ------- + >>> expand_constant("1'h1") + [1] + + >>> expand_constant("4'hd") + [1, 1, 0, 1] + + >>> expand_constant("4'd13") + [1, 1, 0, 1] + """ + size, rest = expr.split("'") + size = int(size) + base = rest[0].lower() + value = rest[1:] + + if base == "h": + int_value = int(value, 16) + elif base == "d": + int_value = int(value, 10) + else: + raise ValueError(f"Unsupported constant format: {expr}") + + bits = [(int_value >> i) & 1 for i in range(size - 1, -1, -1)] + return bits + + def parse_assigns(content): ''' Parses Verilog assign statements and expands any bit ranges into individual @@ -322,6 +401,17 @@ def parse_assigns(content): - Ports mapped to wires by Yosys - Constant assignments in resynth + If the LHS is a full variable and the RHS is a concatenation or range with + multiple bits, the LHS is automatically expanded to match the RHS + bit width. For example: + + assign out = { a[1], b[0] } + + ...will produce: + + [('out[1]', 'a[1]'), + ('out[0]', 'b[0]')] + Parameters ---------- content : string @@ -329,7 +419,7 @@ def parse_assigns(content): Returns ------- - List[Tuple[string, string]] + list[Tuple[string, string]] A list of (lhs, rhs) assignment pairs, one for each individual bit. Examples @@ -339,28 +429,104 @@ def parse_assigns(content): ... assign foo[1:0] = bar[3:2]; ... assign x = 0; ... assign { out[4:3], out[0:1] } = { in1[3], in2[1:0], in3[2] }; + ... assign out = { in1[0:1], in2[0:1] } ... """ >>> parse_assigns(code) [('a[2]', 'b[2]'), ('foo[1]', 'bar[3]'), ('foo[0]', 'bar[2]'), ('x', '0'), - ('out[4]', 'in1[3]'), ('out[3]', 'in2[1]'), ('out[0]', 'in2[0]'), ('out[1]', 'in3[2]') + ('out[4]', 'in1[3]'), ('out[3]', 'in2[1]'), ('out[0]', 'in2[0]'), ('out[1]', 'in3[2]'), + ('out[3]', 'in1[0]'), ('out[2]', 'in1[1]'), ('out[1]', 'in2[1]'), ('out[0]', 'in2[0]') ] - - TODO: This method can't handle range or concatenated assignments to full - variables. For example in the following case it will cause a "Bit width - mismatch" error even if `out` is a 4 bit variable, because it doesn't know - that: - - assign out = { in1[0:1], in2[0:1] } ''' - expreg = r'assign\s+(.*?)\s*=\s*(.*?);' + expreg = r"assign\s+(.*?)\s*=\s*(.*?);" assigns = re.findall(expreg, content) result = [] for lhs, rhs in assigns: lhs_bits = expand_concat(lhs) rhs_bits = expand_concat(rhs) + + # if LHS is a single bare name but RHS is wide, expand LHS to match + if len(lhs_bits) == 1 and lhs_bits[0] == lhs and len(rhs_bits) > 1: + width = len(rhs_bits) + # msb = width-1 down to 0 + lhs_bits = [f"{lhs}[{i}]" for i in range(width - 1, -1, -1)] + if len(lhs_bits) != len(rhs_bits): raise ValueError(f"Bit width mismatch: LHS {lhs_bits} != RHS {rhs_bits}") + result.extend(zip(lhs_bits, rhs_bits)) return result + + +def extract_param_names(raw_parameters): + """ + Extracts the names of input/output/inout parameters from a module definition string. + + Parameters + ---------- + raw_parameters : str + String of the module's parameter list, e.g. "input a, output [3:0] b". + + Returns + ------- + list[str] + Ordered list of parameter names as they appear in the string. + """ + return re.findall( + r"\b(?:input|output|inout)?\s*(?:\[.*?\]\s*)?(\w+)", raw_parameters + ) + + +def sort_expanded_vars(expanded_vars, raw_parameters): + """ + Sorts a list of expanded signal names (e.g. in[3], in[2], ..., in[0]) + based on their order in the module definition and from MSB to LSB. + + Parameters + ---------- + expanded_vars : list[str] + List of bit-level signal names. + raw_parameters : str + Module parameter list string for determining signal order. + + Returns + ------- + list[str] + Sorted list of expanded variables. + """ + param_order = extract_param_names(raw_parameters) + order_map = {name: i for i, name in enumerate(param_order)} + + def sort_key(var): + base, idx = re.match(r"(\w+)(?:\[(\d+)\])?", var).groups() + return (order_map[base], -int(idx) if idx else 0) + + return sorted(expanded_vars, key=sort_key) + + +def sort_raw_vars(raw_list, raw_parameters): + """ + Sorts unexpanded input/output/inout declarations by the order of their + parameter names in the module definition. + + Parameters + ---------- + raw_list : list[str] + List of unexpanded variable declarations, e.g. "input [3:0] a;". + raw_parameters : str + Module parameter list string for determining signal order. + + Returns + ------- + list[str] + Sorted list of raw declarations. + """ + param_order = extract_param_names(raw_parameters) + order_map = {name: i for i, name in enumerate(param_order)} + + def sort_key(line): + match = re.search(r"(\w+)\s*;", line) + return order_map.get(match.group(1), float("inf")) if match else float("inf") + + return sorted(raw_list, key=sort_key) diff --git a/poisonoak.config b/poisonoak.config deleted file mode 100644 index ddab841..0000000 --- a/poisonoak.config +++ /dev/null @@ -1 +0,0 @@ -RTL=/home/sudohumberto/circuits/brent.kung.16b/UBBKA_15_0_15_0.v diff --git a/poisonoak.help b/poisonoak.help deleted file mode 100644 index 2fc1706..0000000 --- a/poisonoak.help +++ /dev/null @@ -1,5 +0,0 @@ -In order to use the program as a tool, you need to setup a config file. - -1. Create a file named 'poisonoak.config' inside poisonoak folder: - -2. Content of the config files diff --git a/runner.py b/runner.py new file mode 100644 index 0000000..3432905 --- /dev/null +++ b/runner.py @@ -0,0 +1,724 @@ +from collections import deque +from collections.abc import Callable +import csv +import copy +import os +import time +from xml.etree import ElementTree +from circuit import Circuit +from circuiterror import compute_error +from ml_algorithms.decision_tree import DecisionTreeCircuit +from pruning_algorithms.ccarving import FindCut +from pruning_algorithms.glpsignificance import GetbySignificance, LabelCircuit +from pruning_algorithms.inouts import GetInputs, GetOutputs +from pruning_algorithms.probprun import GetOneNode +from utils import read_dataset +from configuration import AlsMethod, ApproxSynthesisConfig, Metric + +type Results = dict[Metric, float] + +# Directory to output build files to. +# TODO: Consider making this a config parameter. +BUILD_DIR = f"{os.path.dirname(__file__)}/build" + +# Files generated by the different methods. Defined here to just reuse in the +# code. +APPROX_RTL = f"{BUILD_DIR}/.approx.v" + +RESYNTH_RTL = f"{BUILD_DIR}/.resynth.v" + +EXACT_OUTPUT = f"{BUILD_DIR}/.exact_output" +APPROX_OUTPUT = f"{BUILD_DIR}/.approx_output" +TEMP_OUTPUT = f"{BUILD_DIR}/.tmp_output" + +TB = f"{BUILD_DIR}/.tb.v" + +VALIDATION_DATASET = f"{BUILD_DIR}/.v_dataset" +VALIDATION_TB = f"{BUILD_DIR}/.v_tb.v" +VALIDATION_EXACT_OUTPUT = f"{BUILD_DIR}/.v_exact_output" +VALIDATION_APPROX_OUTPUT = f"{BUILD_DIR}/.v_approx_output" + +VCD = f"{BUILD_DIR}/.vcd" +VCD_TB = f"{BUILD_DIR}/.vcd_tb.v" +SAIF = f"{BUILD_DIR}/.saif" + + +def run(config: ApproxSynthesisConfig) -> tuple[Results, Results | None]: + """ + Runner function for an execution specified by a valid ApproxSynthesisConfig. + This function will do the following steps: + - Simulate the exact circuit + - Carry out the given ALS method + - Calculate the metrics given in config.metrics and return them as a Results + dict. + + If the 'validation' option is set, it will also return a second Results + object which contains the error metrics specified, calculated on the + validation set. It won't include non-error metrics like area or time since + those won't change based on the dataset. + """ + if not os.path.exists(BUILD_DIR): + os.makedirs(BUILD_DIR) + + _create_tbs_and_exact_outputs(config) + + # The benchmark functions should return the final approximated circuit (for + # area calculation and validation simulation) and also carry out the final + # simulation to generate the APPROX_OUTPUT that will be used to calculate + # error metrics. + # + # We want the benchmark_fn to carry out this final simulation because a lot + # of the methods need to carry out simulations in order to iterate (for + # example the constant inputs and outputs methods), so it's better to make + # use of those simulations instead of re-running the same sim outside of the + # benchmark_fn. + benchmark_fn: Callable[[ApproxSynthesisConfig], Circuit] + match config.method: + case AlsMethod.CONSTANT_INPUTS: + benchmark_fn = _run_constant_inputs + case AlsMethod.CONSTANT_OUTPUTS: + benchmark_fn = _run_constant_outputs + case AlsMethod.PROBPRUN: + _create_saif(config) + benchmark_fn = _run_probprun + case AlsMethod.SIGNIFICANCE: + benchmark_fn = _run_significance + case AlsMethod.CCARVING: + benchmark_fn = _run_ccarving + case AlsMethod.DECISION_TREE: + benchmark_fn = _run_decision_tree + + # Timed code includes: + # - Execution of ALS method. + # - Simulation of approximated circuit. + # - Calculation of all metrics. + # + # We include simulations and calculation of all metrics because, even though + # they don't contribute directly to generating the final circuit, they are + # a necessary part of ALS in order to learn the circuit's characteristics + # and whether it's a worthwhile candidate. + start_time = time.perf_counter() + + original_area = float(config.circuit.get_area()) + + approx_circuit = benchmark_fn(config) + + if config.validation is not None: + approx_circuit.simulate(VALIDATION_TB, VALIDATION_APPROX_OUTPUT) + + results, validation_results = _compute_error_metrics(config) + + if Metric.AREA in config.metrics: + approx_area = float(approx_circuit.get_area()) + results[Metric.AREA] = approx_area / original_area + + end_time = time.perf_counter() + + if Metric.ALS_TIME in config.metrics: + elapsed_time = end_time - start_time + results[Metric.ALS_TIME] = elapsed_time + + # For debugging or checking the final output + approx_circuit.write_to_disk(APPROX_RTL) + + if config.csv: + _write_results_to_csv(config, results, validation_results) + + return results, validation_results + + +def _create_saif(config: ApproxSynthesisConfig): + """ + If the circuit doesn't have timing information, create a SAIF file and + annotate the circuit with its data. + """ + + nodes = config.circuit.netl_root.findall("node") + node_outputs = [node.findall("output")[0] for node in nodes] + circuit_has_timing_info = all( + "t1" in node_output.attrib for node_output in node_outputs + ) + + if circuit_has_timing_info: + print("Circuit already has timing info, skipping SAIF generation") + return + + config.circuit.write_tb( + VCD_TB, + config.dataset, + dump_vcd=VCD, + show_progress=config.show_progress, + ) + config.circuit.exact_output(VCD_TB, EXACT_OUTPUT) + config.circuit.generate_saif_from_vcd(SAIF, VCD) + + +def _write_results_to_csv( + config: ApproxSynthesisConfig, results: Results, validation_results: None | Results +): + """ + Writes the execution results to a CSV file as a single row. + """ + assert config.csv is not None, ( + "_write_results_to_csv should only be called if a csv file was given by the user" + ) + + file_exists = os.path.isfile(config.csv) + + with open(config.csv, mode="a", newline="") as file: + writer = csv.writer(file) + # If the file does not exist, write the header + if not file_exists: + writer.writerow(config.csv_columns()) + + writer.writerow(config.csv_values(results, validation_results)) + + +def _run_decision_tree(config: ApproxSynthesisConfig) -> Circuit: + exact_circuit = config.circuit + outputs = read_dataset(EXACT_OUTPUT, 10) + inputs = read_dataset(config.dataset, 16, max_lines=len(outputs)) + # We use max_lines because the output set might be smaller due to a + # validation set being used + + tree = DecisionTreeCircuit( + exact_circuit.inputs, + exact_circuit.outputs, + one_tree_per_output=config.one_tree_per_output, + max_depth=config.max_depth, + ) + + tree.train(inputs, outputs) + tree.to_verilog_file(exact_circuit.topmodule, APPROX_RTL) + print("Synthesizing circuit from trained decision tree, this might take a while...") + tree_circuit = Circuit( + APPROX_RTL, exact_circuit.tech_file, topmodule=exact_circuit.topmodule + ) + + if config.resynthesis: + tree_circuit.resynth() + + tree_circuit.simulate(TB, APPROX_OUTPUT) + + return tree_circuit + + +def _run_constant_inputs(config: ApproxSynthesisConfig) -> Circuit: + return _run_constant_inputs_outputs(config, config.circuit.inputs, "inputs") + + +def _run_constant_outputs(config: ApproxSynthesisConfig) -> Circuit: + return _run_constant_inputs_outputs(config, config.circuit.outputs, "outputs") + + +def _run_constant_inputs_outputs( + config: ApproxSynthesisConfig, circuit_variables: list[str], inputs_or_outputs: str +) -> Circuit: + """ + The InOuts method accepts either a list of inputs or outputs to make + constant, and then returns a list of nodes that could be pruned. + + The selection of which input/outputs to make constant is not part of the + InOuts method, so each user must select them under whichever criteria fits + their use case best. + + For this runner execution, which must use a generic heuristic for any + circuit, we'll select the LSBs of each input/output to be constant. If we + manage to prune all the suggested nodes without going over the error + threshold or the max iterations then we can use the next LSB of each + input/output. For example, in a circuit with the input/outputs: + + ["in1[2]", "in1[1]", "in1[0]", "in2[2]", "in2[1]", "in2[0]", cin] + + We'll first prune the nodes suggested when ["in1[0]", "in2[0]", cin] are set + as constants. If we delete all the suggested nodes then we'll move on to the + nodes suggested when ["in1[0]", "in1[1]", "in2[0]", "in2[1]", cin] are + constants. + """ + circuit = config.circuit + + assert config.error is not None, ( + f"'error' should be given when executing {config.method}" + ) + + max_const_bit = 0 + iteration = 0 + max_iters = config.max_iters if config.max_iters else float("inf") + + while iteration < max_iters: + deletable_nodes: list[ElementTree.Element] + nodes_to_delete: list[ElementTree.Element] = [] + + while len(nodes_to_delete) < config.prunes_per_iteration: + const_variables = _get_lsbs_up_to(circuit_variables, max_const_bit) + + match inputs_or_outputs: + case "inputs": + deletable_nodes = GetInputs(circuit.netl_root, const_variables) + case "outputs": + deletable_nodes = GetOutputs(circuit.netl_root, const_variables) + case _: + raise ValueError("Invalid call to _run_constant_inputs_outputs") + + # Filter Already deleted nodes + deletable_nodes_filtered = [ + node + for node in deletable_nodes + if ( + node.get("delete") != "yes" + and node.attrib["var"] + not in [ + node_to_delete.attrib["var"] + for node_to_delete in nodes_to_delete + ] + ) + ] + + max_nodes_to_append = config.prunes_per_iteration - len(nodes_to_delete) + for node in deletable_nodes_filtered[:max_nodes_to_append]: + nodes_to_delete.append(node) + deletable_nodes_filtered.remove(node) + + if len(deletable_nodes_filtered) == 0: + # If all deletable nodes are in nodes_to_delete, we can increase + # max_const_bit and go again or exit if all the eligible + # variables (i.e. all inputs or all outputs) have been tried out + # as const variables. + if set(const_variables) == set(circuit_variables): + if len(nodes_to_delete) != 0: + # There might be some nodes to delete from a previous + # iteration of this loop, so finish deleting those. + break + else: + # Nothing left to do, finish execution. + return circuit + else: + # All deletable nodes were added to nodes_to_delete but + # there's still more nodes to delete, increase max_const_bit + max_const_bit += 1 + continue + + nodes_to_delete_names = [node.attrib["var"] for node in nodes_to_delete] + print(f"Iteration {iteration + 1}: Pruning nodes {nodes_to_delete_names}") + + for node in nodes_to_delete: + node.set("delete", "yes") + + if config.resynthesis: + resynth_circuit = copy.copy(circuit) + resynth_circuit.resynth() + error = resynth_circuit.simulate_and_compute_error( + TB, EXACT_OUTPUT, TEMP_OUTPUT, Metric.MEAN_RELATIVE_ERROR_DISTANCE + ) + else: + error = circuit.simulate_and_compute_error( + TB, EXACT_OUTPUT, TEMP_OUTPUT, Metric.MEAN_RELATIVE_ERROR_DISTANCE + ) + + print(f"Pruned circuit error: {error}") + + if error > config.error: + print("Error has overpassed threshold, backtracking...\n") + circuit = _undo_prunes( + circuit, nodes_to_delete, config.error, config.resynthesis + ) + os.replace(TEMP_OUTPUT, APPROX_OUTPUT) + break + else: + if config.resynthesis: + circuit = resynth_circuit + + iteration += 1 + os.replace(TEMP_OUTPUT, APPROX_OUTPUT) + + return circuit + + +def _run_probprun(config: ApproxSynthesisConfig) -> Circuit: + # TODO (Possible improvement): After pruning some nodes, if we re-simulate + # the cirucit re-generating the vcd file, and with the new vcd re-generate + # the SAIF, one can notice a different timing behaviour from the remaining + # existing nodes. We don't re-simulate and re-generate the SAIF because the + # python method to regenerate the SAIF takes really long even for small + # datasets. (Example: BK_16b, 4000 inputs, takes ~30 seconds to generate + # SAIF.) But, if we find a way to generate the SAIF file quickly, for example + # using a faster language for it, we might want to regenerate it on every + # iteration or every N iterations. + + circuit = config.circuit + circuit_root = circuit.netl_root + + assert config.error is not None, ( + f"'error' should be given when executing {config.method}" + ) + + iteration = 0 + max_iters = config.max_iters if config.max_iters else float("inf") + + probprun = GetOneNode(circuit_root) + while iteration < max_iters: + nodes_to_delete = [] + nodes_info = [] + + for (node, output, time_percent), _ in zip( + probprun, range(config.prunes_per_iteration) + ): + node_to_delete = circuit_root.find(f"./node[@var='{node}']") + + assert node_to_delete is not None, ( + f"Node {node} suggested by ProbPrun should be findable in the circuit" + ) + + nodes_to_delete.append(node_to_delete) + nodes_info.append((output, time_percent)) + + if len(nodes_to_delete) == 0: + # If no nodes were appended it means there's no nodes left to delete + return circuit + + nodes_to_delete_names = [node.attrib["var"] for node in nodes_to_delete] + + print( + f"Iteration {iteration + 1}: Pruning nodes {nodes_to_delete_names} because:" + ) + for node, (output, time_percent) in zip(nodes_to_delete_names, nodes_info): + print(f"{node} is {output} {time_percent}% of the time") + + for node in nodes_to_delete: + node.set("delete", "yes") + + if config.resynthesis: + resynth_circuit = copy.copy(circuit) + resynth_circuit.resynth() + error = resynth_circuit.simulate_and_compute_error( + TB, EXACT_OUTPUT, TEMP_OUTPUT, Metric.MEAN_RELATIVE_ERROR_DISTANCE + ) + else: + error = circuit.simulate_and_compute_error( + TB, EXACT_OUTPUT, TEMP_OUTPUT, Metric.MEAN_RELATIVE_ERROR_DISTANCE + ) + + print(f"Pruned circuit error: {error}") + + if error > config.error: + print("Error has overpassed threshold, backtracking...\n") + circuit = _undo_prunes( + circuit, nodes_to_delete, config.error, config.resynthesis + ) + os.replace(TEMP_OUTPUT, APPROX_OUTPUT) + break + else: + if config.resynthesis: + circuit = resynth_circuit + + iteration += 1 + os.replace(TEMP_OUTPUT, APPROX_OUTPUT) + + return circuit + + +def _run_significance(config: ApproxSynthesisConfig) -> Circuit: + circuit = config.circuit + circuit_root = circuit.netl_root + + assert config.error is not None, ( + f"'error' should be given when executing {config.method}" + ) + + iteration = 0 + max_iters = config.max_iters if config.max_iters else float("inf") + + if config.output_significances is not None: + output_significances = config.output_significances + else: + output_significances = [] + + nodes_sorted_by_significance = GetbySignificance(circuit_root, output_significances) + while iteration < max_iters: + nodes_to_delete = [] + nodes_info = [] + + for (node, significance), _ in zip( + nodes_sorted_by_significance, range(config.prunes_per_iteration) + ): + node_to_delete_ = circuit_root.find(f"./node[@var='{node}']") + + assert node_to_delete_ is not None, ( + f"Node {node} suggested by GetbySignificance should be findable in the circuit" + ) + nodes_to_delete.append(node_to_delete_) + nodes_info.append(significance) + + nodes_to_delete_names = [node.attrib["var"] for node in nodes_to_delete] + print( + f"Iteration {iteration + 1}: Pruning nodes {nodes_to_delete_names} because:" + ) + for node, significance in zip(nodes_to_delete_names, nodes_info): + print(f"{node} has {significance} significance") + + for node in nodes_to_delete: + node.set("delete", "yes") + + if config.resynthesis: + resynth_circuit = copy.copy(circuit) + resynth_circuit.resynth() + error = resynth_circuit.simulate_and_compute_error( + TB, EXACT_OUTPUT, TEMP_OUTPUT, Metric.MEAN_RELATIVE_ERROR_DISTANCE + ) + else: + error = circuit.simulate_and_compute_error( + TB, EXACT_OUTPUT, TEMP_OUTPUT, Metric.MEAN_RELATIVE_ERROR_DISTANCE + ) + + print(f"Pruned circuit error: {error}") + + if error > config.error: + print("Error has overpassed threshold, backtracking...\n") + circuit = _undo_prunes( + circuit, nodes_to_delete, config.error, config.resynthesis + ) + os.replace(TEMP_OUTPUT, APPROX_OUTPUT) + break + else: + if config.resynthesis: + circuit = resynth_circuit + + iteration += 1 + os.replace(TEMP_OUTPUT, APPROX_OUTPUT) + + return circuit + + +def _run_ccarving(config: ApproxSynthesisConfig) -> Circuit: + circuit = config.circuit + + assert config.error is not None, ( + f"'error' should be given when executing {config.method}" + ) + + iteration = 0 + max_iters = config.max_iters if config.max_iters else float("inf") + + if config.output_significances is not None: + output_significances = config.output_significances + else: + output_significances = [] + + # TODO: Allow specifying the diff threshold in the config + diff_threshold = 2 ** (len(circuit.outputs)) - 1 + + # TODO: NEED to add harshness_level to the config. + harshness_level = 1 + + # Currently (May 2025) significance is the only relevant difference metric + # that we have available, if this changes in the future 'diff' could be added + # as a config parameter. + diff = "significance" + + while iteration < max_iters: + print("Finding cuts...", flush=True) + circuit_root = circuit.netl_root + + LabelCircuit(circuit_root, output_significances) + cuts = FindCut(circuit.netl_root, diff_threshold, diff, harshness_level) + + if len(cuts) == 0: + print("No more cuts left to make") + return circuit + + nodes_to_delete = cuts[0] + nodes_to_delete_names = [n.attrib["var"] for n in nodes_to_delete] + + print( + f"Iteration {iteration + 1}: Pruning nodes {nodes_to_delete_names} as a single cut...\n" + ) + [n.set("delete", "yes") for n in nodes_to_delete] + + if config.resynthesis: + resynth_circuit = copy.copy(circuit) + resynth_circuit.resynth() + error = resynth_circuit.simulate_and_compute_error( + TB, EXACT_OUTPUT, TEMP_OUTPUT, Metric.MEAN_RELATIVE_ERROR_DISTANCE + ) + else: + error = circuit.simulate_and_compute_error( + TB, EXACT_OUTPUT, TEMP_OUTPUT, Metric.MEAN_RELATIVE_ERROR_DISTANCE + ) + + print(f"Pruned circuit error: {error}") + + if error > config.error: + print("Error has overpassed threshold, backtracking...\n") + circuit = _undo_prunes( + circuit, nodes_to_delete, config.error, config.resynthesis + ) + os.replace(TEMP_OUTPUT, APPROX_OUTPUT) + break + else: + if config.resynthesis: + circuit = resynth_circuit + + iteration += 1 + os.replace(TEMP_OUTPUT, APPROX_OUTPUT) + + return circuit + + +def _compute_error_metrics( + config: ApproxSynthesisConfig, +) -> tuple[Results, Results | None]: + results: Results = {} + if config.validation is not None: + validation_results = {} + else: + validation_results = None + + for metric in config.metrics: + if metric.is_error_metric(): + error = compute_error(metric.value, EXACT_OUTPUT, APPROX_OUTPUT) + results[metric] = error + if validation_results is not None and config.validation != 0: + error = compute_error( + metric.value, VALIDATION_EXACT_OUTPUT, VALIDATION_APPROX_OUTPUT + ) + validation_results[metric] = error + + return results, validation_results + + +def _get_lsbs_up_to(variables: list[str], bit_index: int) -> list[str]: + """ + Get a list of circuit variables with bits up to a specified index. + + Parameters + ---------- + variables : list of str + A list of circuit variable strings, which may include bit indices in the format 'var[bit_index]' or just 'var'. + bit_index : int + The bit index up to which the variables should be included in the result. + + Returns + ------- + list of str + A list of variables that have a bit index less than or equal to the specified bit index. + If a variable does not have a bit index (e.g., 'cin'), it is included in the result. + + Examples + -------- + >>> variables = ["in1[2]", "in1[1]", "in1[0]", "in2[2]", "in2[1]", "in2[0]", "cin"] + >>> get_lsbs_up_to(variables, 0) + ['in1[0]', 'in2[0]', 'cin'] + + >>> get_lsbs_up_to(variables, 1) + ['in1[0]', 'in1[1]', 'in2[0]', 'in2[1]', 'cin'] + + >>> get_lsbs_up_to(["out[0]", "out[1]", "out[2]"], 0) + ['out[0]'] + """ + result = [] + for var in variables: + # Check if the variable is a string and contains a bit index + if "[" in var and "]" in var: + # Extract the bit index from the variable string + start = var.index("[") + 1 + end = var.index("]") + var_bit_index = int(var[start:end]) + + # Check if the variable's bit index is less than or equal to the specified bit index + if var_bit_index <= bit_index: + result.append(var) + else: + # If it's not a variable with a bit index, add it directly (e.g., cin) + result.append(var) + + return result + + +def _create_tbs_and_exact_outputs(config: ApproxSynthesisConfig): + """ + Generate test and validation testbenches along with their exact outputs + based on the provided configuration. + + If a validation fraction is specified, the function splits the dataset into + test and validation sets, creating corresponding testbenches and exact + outputs; otherwise, it creates a single testbench using the full dataset. + """ + if config.validation is not None: + # Count the number of inputs in the dataset + with open(config.dataset, "r") as file: + total_lines = sum(1 for _ in file) + + # Create TB that reads only the test dataset + test_dataset_size = int(round((1 - config.validation) * total_lines)) + config.circuit.write_tb( + TB, + config.dataset, + show_progress=config.show_progress, + iterations=test_dataset_size, + ) + config.circuit.exact_output(TB, EXACT_OUTPUT) + + # Create validation TB + validation_dataset_size = int(round(config.validation * total_lines)) + _copy_last_n_lines(config.dataset, VALIDATION_DATASET, validation_dataset_size) + config.circuit.write_tb( + VALIDATION_TB, VALIDATION_DATASET, show_progress=config.show_progress + ) + config.circuit.exact_output(VALIDATION_TB, VALIDATION_EXACT_OUTPUT) + else: + # No validation set, just create a regular TB using the full dataset + config.circuit.write_tb(TB, config.dataset, show_progress=config.show_progress) + config.circuit.exact_output(TB, EXACT_OUTPUT) + + +def _copy_last_n_lines(input_file: str, output_file: str, n: int) -> None: + """ + Copy the last N lines from an input file to an output file. + + Used to create a validation dataset from the original dataset. + """ + with open(input_file, "r") as infile: + last_n_lines = deque(infile, maxlen=n) + + with open(output_file, "w") as outfile: + outfile.writelines(last_n_lines) + + +def _undo_prunes( + circuit, + deleted_nodes: list[ElementTree.Element], + error_threshold: float, + resynthesis: bool, +) -> Circuit: + """ + Will set the deleted_nodes "delete" propert to "no". Then simulates the + circuit and if the error is less than the error_threshold it returns. + Meant for backtracking the last iteration of prunes when the error threshold + is surpassed. + """ + for node in reversed(deleted_nodes): + print(f"Undoing prune on node {node.attrib['var']}") + node.set("delete", "no") + if resynthesis: + resynth_circuit = copy.copy(circuit) + resynth_circuit.resynth() + error = resynth_circuit.simulate_and_compute_error( + TB, EXACT_OUTPUT, TEMP_OUTPUT, Metric.MEAN_RELATIVE_ERROR_DISTANCE + ) + else: + error = circuit.simulate_and_compute_error( + TB, EXACT_OUTPUT, TEMP_OUTPUT, Metric.MEAN_RELATIVE_ERROR_DISTANCE + ) + print(f"New error: {error}") + if error < error_threshold: + print("Error back to being under threshold, backtracking finished") + if resynthesis: + return resynth_circuit + else: + return circuit + + print("Reverted all prunes.") + if resynthesis: + return resynth_circuit + else: + return circuit diff --git a/synthesis.py b/synthesis.py index 96af95b..1815e9f 100644 --- a/synthesis.py +++ b/synthesis.py @@ -44,7 +44,7 @@ def synthesis (rtl, tech, topmodule): # - - - - - - - - - - - - - - - Execute yosys - - - - - - - - - - - - - - - os.system ('yosys synth.ys;') + os.system ('yosys -q synth.ys;') # - - - - - - - - - - - - - Delete temporal Files - - - - - - - - - - - - @@ -77,12 +77,12 @@ def resynthesis(netlist, tech, topmodule): netlist_path = os.path.dirname(netlist) + "/netlist.v" - file_text = file_text.replace("[[RTLFILENAME]]", netlist) + file_text = file_text.replace("[[RTLFILENAME]]", f'"{netlist}"') file_text = file_text.replace("[[TOPMODULE]]", topmodule) - file_text = file_text.replace("[[TECHNOLOGY]]", f'{current_dir}/templates/{tech}.v') - file_text = file_text.replace("[[NETLIST]]", netlist_path) - file_text = file_text.replace("[[LIBRARY]]", f"{current_dir}/templates/{tech}.lib") - file_text = file_text.replace("[[LIBRARYABC]]", f"{current_dir}/templates/{tech}.lib") + file_text = file_text.replace("[[TECHNOLOGY]]", f'"{current_dir}/templates/{tech}.v"') + file_text = file_text.replace("[[NETLIST]]", f'"{netlist_path}"') + file_text = file_text.replace("[[LIBRARY]]", f'"{current_dir}/templates/{tech}.lib"') + file_text = file_text.replace("[[LIBRARYABC]]", f'"{current_dir}/templates/{tech}.lib"') file = open('resynth.ys',"w") file.write(file_text) @@ -90,7 +90,7 @@ def resynthesis(netlist, tech, topmodule): # - - - - - - - - - - - - - - - Execute yosys - - - - - - - - - - - - - - - os.system ('yosys resynth.ys;') + os.system ('yosys -q resynth.ys;') # - - - - - - - - - - - - - Delete temporal Files - - - - - - - - - - - - @@ -134,7 +134,7 @@ def ys_get_area(netlist, tech, topmodule): # - - - - - - - - - - - - - - - Execute yosys - - - - - - - - - - - - - - - os.system (f'yosys stat.ys -l \"{yosys_log_path}\"') + os.system (f'yosys -q stat.ys -l \"{yosys_log_path}\"') # - - - - - - - - - - - - - - - Parse Area - - - - - - - - - - - - - - - diff --git a/utils.py b/utils.py index 5b950f0..93f6c41 100644 --- a/utils.py +++ b/utils.py @@ -4,7 +4,7 @@ import string import numpy as np import math -from random import uniform, gauss, triangular +from random import randrange, gauss, triangular def get_name(length): timestamp = datetime.now().strftime("%H%M%S") @@ -27,6 +27,10 @@ def get_random(bits: int, distribution='uniform', samples=1, **kwargs): "gaussian" or "normal" for a normal distribution. "uniform" or "rectangular" for a uniform distribution. "triangular" for a triangular distribution. + "shuffle_bag": Uniform, non-repeating values using a shuffle bag + algorithm. Should not be used for a lot of input bits, + see `Circuit.generate_dataset` docs for a detailed + explanation why. TODO: Add more distributions samples: int Number of samples. @@ -66,7 +70,13 @@ def get_random(bits: int, distribution='uniform', samples=1, **kwargs): '''Distributions case''' data=[] if distribution in {'uniform', 'rectangular'}: - data=(int(math.floor(uniform(low_limit,high_limit))) for _ in range(samples)) + data=(randrange(low_limit, high_limit) for _ in range(samples)) + + # TODO: There's an issue with the `triangular` and `gauss` method, which is + # that due to returning floats, they generate values were only around 50 + # MSBs have a non-zero value, which makes them unsuitable for larger + # circuits that can have 64, 128, or even more input bits. + elif distribution=='triangular': data=(int(math.floor(triangular(low_limit,high_limit,mode=median))) for _ in range(samples)) elif distribution in {'normal', 'gaussian'}: @@ -74,12 +84,25 @@ def get_random(bits: int, distribution='uniform', samples=1, **kwargs): random_value=int(math.floor(gauss(median,variance))) if low_limit<=random_value<=high_limit: data.append(random_value) + elif distribution == 'shuffle_bag': + range_size = high_limit - low_limit + num_cycles = math.ceil(samples / range_size) + + for i in range(num_cycles): + bag = list(range(low_limit, high_limit)) + random.shuffle(bag) + + samples_remaining = samples - len(data) + if samples_remaining > range_size: + data.extend(bag) + else: + data.extend(bag[0:samples_remaining]) else: raise ValueError(f'{distribution} is not a valid distribution name') return data -def read_dataset(filename, base, max_lines=None): +def read_dataset(filename: str, base: int, max_lines: None | int =None) -> list[list[int]]: """ Reads a dataset or circuit output file like those generated by the `Circuit.generate_dataset` file or `Circuit.exact_output`. @@ -96,12 +119,22 @@ def read_dataset(filename, base, max_lines=None): max_lines : None | int The maximum amount of lines to read, in case the user doesn't want to use the entire dataset. + + Returns + ---------- + dataset : list[list[int]] + The data read from the file. The returned list is ordered by rows first, + then columns. For example indexing dataset[2][5] requests the 5th + element of the 2nd row. """ with open(filename, "r") as f: if max_lines is not None: - return [ + values = [ [int(x, base) for x in line.split()] for _, line in zip(range(max_lines), f) ] else: - return [[int(x, base) for x in line.split()] for line in f] + values = [[int(x, base) for x in line.split()] for line in f] + + # Filter out empty lines + return [value for value in values if value]