Source code for core.benchmarks.Benchmark

from __future__ import annotations

import os
from abc import ABC, abstractmethod
from functools import reduce
from pathlib import Path
from typing import Dict, Any, Set, List

from blifparser.blifparser import BlifParser
from blifparser.keywords.generic import Blif, Names
from networkx import topological_sort, DiGraph
from z3 import Bool

from Loggable import Loggable
from utils import config
from utils.PLAParser import PLA, PLAParser
from utils.VerilogParser import VerilogModule, VerilogParser
from core.BooleanFunction import BooleanFunction
from core.IOInterface import IOInterface
from core.benchmarks.Formula import Formula, VerilogFormula
from core.expressions.BooleanExpression import LITERAL


[docs] class Benchmark(BooleanFunction, Loggable, IOInterface, ABC): """ An abstract class to represent a benchmark. A benchmark is a multi-output Boolean function. """ def __init__(self, file_path: Path = None, name: str = None): """ A benchmark has optionally a file path and a name. :param file_path: Optionally, a file path for the benchmark. :param name: Optionally, a name for the benchmark. """ super().__init__() self.file_path = file_path self._set_name(file_path, name)
[docs] def get_log(self) -> List[Dict[str, Any]] | Dict[str, Any]: return { "type": self.__class__.__name__, "inputs": len(self.get_input_variables()), "outputs": len(self.get_output_variables()), "wires": len(self.get_auxiliary_variables()) }
def _set_name(self, file_path: Path, name: str): """ Sets the name for the benchmark based on the given file path or the given name. If no name is provided, then the stem of the file path is used as name for this benchmark. Otherwise, the given name is used for this benchmark. :param file_path: The file path of the benchmark. :param name: The name of the benchmark. :return: """ if name is None: if file_path is None: self.name = "" else: self.name = file_path.stem else: self.name = name def _abc_conversion(self, original_filename: str, new_filename: str, write_cmd: str): import time import pexpect abc_original_path = config.abc_path.joinpath(original_filename) abc_new_path = config.abc_path.joinpath(new_filename) self.write(abc_original_path) time.sleep(2) process = pexpect.spawn(config.abc_cmd, cwd=str(config.abc_path)) process.sendline('read "{}"; {} "{}";'.format(original_filename, write_cmd, new_filename)) while not abc_new_path.exists(): pass time.sleep(2) benchmark = Benchmark.read(abc_new_path) os.remove(abc_original_path) return benchmark
[docs] @abstractmethod def to_blif(self) -> BLIFBenchmark: pass
[docs] @abstractmethod def to_pla(self) -> PLABenchmark: pass
[docs] @abstractmethod def to_verilog(self) -> VerilogBenchmark: pass
[docs] class BLIFBenchmark(Benchmark, Loggable): """ A class to represent a BLIF benchmark. """ def __init__(self, blif: Blif, file_path: Path = None, name: str = None): """ A BLIF benchmark has the BLIF content, and optionally a file path and a name. :param blif: The BLIF content. :param file_path: Optionally, a file path for the benchmark. :param name: Optionally, a name for the benchmark. """ super().__init__(file_path, name) self.blif = blif self.functions = self._get_functions()
[docs] @staticmethod def get_file_extension() -> str: return "blif"
[docs] def get_input_variables(self) -> Set[str]: return self.blif.inputs.inputs
[docs] def get_output_variables(self) -> Set[str]: return self.blif.outputs.outputs
[docs] def get_auxiliary_variables(self) -> Set[str]: output_variables = self.get_output_variables() auxiliary_variables = set() for bf in self.blif.booleanfunctions: if bf.output not in output_variables: auxiliary_variables.add(bf.output) return auxiliary_variables
[docs] def model_name(self) -> str: """ Returns the name of this BLIF benchmark. :return: The name of this BLIF benchmark. """ if self.name: return self.name return self.blif.model.name
[docs] @staticmethod def from_string(content: str) -> BooleanFunction: raise NotImplementedError()
[docs] @staticmethod def read(file_path: Path) -> BLIFBenchmark: """ Reads a BLIF benchmark from the file with the given file path. :param file_path: The path of the BLIF file. :return: A BLIF benchmark. """ parser = BlifParser(str(file_path)) blif = parser.blif name = blif.model.name return BLIFBenchmark(blif, file_path, name)
[docs] def to_file_path(self, file_name: str): return Path(file_name + '.' + self.get_file_extension())
[docs] def to_string(self) -> str: content = "" content += ".model {}\n".format(self.model_name()) content += ".inputs {}\n".format(" ".join(self.get_input_variables())) content += ".outputs {}\n".format(" ".join(self.get_output_variables())) for _, names in self.functions.items(): content += str(names) content += ".end" return content
def _get_functions(self) -> Dict[str, Names]: """ Returns the wordlines of this BLIF benchmark. Here, a function is a dictionary with the output as key and the names as value. :return: A dictionary mapping the output to the names. """ functions = dict() for bf in self.blif.booleanfunctions: functions[bf.output] = bf return functions
[docs] def eval(self, instance: Dict[str, bool]) -> Dict[str, bool]: evaluations = dict() evaluations.update(instance) graph = self.to_data_flow_graph() for node in topological_sort(graph): # If the node is a primary input variable, then we must not evaluate it. if node in self.get_input_variables(): continue else: names = self.functions.get(node) inputs = names.inputs truthtable = names.truthtable line_evaluations = [] for line in truthtable: line_evaluation = [] for i in range(len(inputs)): variable = inputs[i] variable_value = evaluations.get(variable) raw_truthtable_value = line[i] if raw_truthtable_value == "~" or raw_truthtable_value == "-": continue elif raw_truthtable_value == "0": truthtable_value = False elif raw_truthtable_value == "1": truthtable_value = True else: raise Exception("Unknown truth value for input variable {}".format(variable)) line_evaluation.append(variable_value == truthtable_value) line_evaluations.append(reduce(lambda x, y: x and y, line_evaluation)) evaluation = reduce(lambda x, y: x or y, line_evaluations) evaluations[node] = evaluation # We remove the primary input variables and the auxiliary variables from the evaluation # such that we only retain the primary output variables. for input_variable in self.get_input_variables(): evaluations.pop(input_variable) for auxiliary_variable in self.get_auxiliary_variables(): evaluations.pop(auxiliary_variable) return evaluations
[docs] def to_data_flow_graph(self) -> DiGraph(): """ Returns a data flow graph of this BLIF benchmark. The leaf nodes are the primary input variables, and the root nodes are the output variables. """ graph = DiGraph() for output_variable, names in self.functions.items(): graph.add_node(output_variable) for input_variable in names.inputs: graph.add_node(input_variable) graph.add_edge(input_variable, output_variable) return graph
[docs] def to_blif(self) -> BLIFBenchmark: return self
[docs] def to_pla(self) -> PLABenchmark: self._abc_conversion("x.blif", "x.pla", "write_pla") pla_benchmark = PLABenchmark.read(config.abc_path.joinpath("x.pla")) abc_new_path = config.abc_path.joinpath("x.pla") os.remove(abc_new_path) return pla_benchmark
[docs] def to_verilog(self) -> VerilogBenchmark: self._abc_conversion("x.blif", "x.v", "write_verilog") verilog_benchmark = VerilogBenchmark.read(config.abc_path.joinpath("x.v")) abc_new_path = config.abc_path.joinpath("x.v") os.remove(abc_new_path) return verilog_benchmark
[docs] class PLABenchmark(Benchmark): """ A class to represent a PLA benchmark. """ def __init__(self, pla: PLA, file_path: Path = None, name: str = None): """ A PLA benchmark has the PLA content, and optionally a file path and auxiliary variables. :param pla: The PLA content. :param file_path: Optionally, a file path for the benchmark. :param name: Optionally, a name for the benchmark. """ self.pla = pla super().__init__(file_path, name)
[docs] def get_file_extension(self) -> str: return "pla"
[docs] def get_input_variables(self) -> Set[str]: return set(self.pla.inputs)
[docs] def get_output_variables(self) -> Set[str]: return set(self.pla.outputs)
[docs] def get_auxiliary_variables(self) -> Set[str]: return set()
[docs] @staticmethod def from_string(content: str) -> BooleanFunction: raise NotImplementedError()
[docs] @staticmethod def read(file_path: Path) -> PLABenchmark: pla_parser = PLAParser(file_path) pla = pla_parser.pla return PLABenchmark(pla, file_path=file_path)
[docs] def to_string(self) -> str: content = "" content += ".i {}\n".format(len(self.get_input_variables())) content += ".o {}\n".format(len(self.get_output_variables())) content += ".ilb {}\n".format(" ".join(self.pla.inputs)) content += ".ob {}\n".format(" ".join(self.pla.outputs)) content += ".p {}\n".format(len(self.pla.truthtable)) for (input_vector, output_vector) in self.pla.truthtable: content += "{} {}\n".format("".join(input_vector), "".join(output_vector)) content += ".e" return content
def _satisfies(self, input_vector: List[str], instance: Dict[str, bool]) -> bool: """ Auxiliary function for the evaluation of a PLA benchmark. Returns true if and only if the given input vector satisfies the given instance. An input vector satisfies the given instance when there is no conflict among the truth values. A conflict arises when we have a pair of (True, False) or (False, True) for the input vector and the instance, i.e. when input_variable_value != truth_table_value. :param input_vector: The input vector of the truth table of this PLA benchmark. :param instance: The given input vector. :return: """ # We iterate over each input variable for i in range(len(self.pla.inputs)): input_variable = self.pla.inputs[i] # We get the truth value from the instance input_variable_value = instance.get(input_variable) # We get the truth value from the truth table of this PLA benchmark raw_truthtable_value = input_vector[i] if raw_truthtable_value == "~" or raw_truthtable_value == "-": continue elif raw_truthtable_value == "0": truth_table_value = False elif raw_truthtable_value == "1": truth_table_value = True else: raise Exception("Unknown truth value for input variable {}".format(input_variable)) if input_variable_value != truth_table_value: return False return True
[docs] def eval(self, instance: Dict[str, bool]) -> Dict[str, bool]: evaluations = dict() # Initially, every output variable is False for output_variable in self.get_output_variables(): evaluations[output_variable] = False # We iterate over the lines in the truth table for line in self.pla.truthtable: input_vector, output_vector = line # When the input vector of the line satisfies the given instance, # then we will determine the evaluation of the output variables # based on the corresponding output vector. if self._satisfies(input_vector, instance): # We iterate over the output variables in the line for i in range(len(self.pla.outputs)): output_variable = self.pla.outputs[i] raw_truthtable_value = output_vector[i] # When we find that an output variable evaluates to true, we use it for the evaluation. # An output variable evaluating to True cannot be undone, i.e. when the entry is False, # we leave the evaluation untouched. if raw_truthtable_value == "~" or raw_truthtable_value == "-": continue elif raw_truthtable_value == "0": continue elif raw_truthtable_value == "1": evaluations[output_variable] = True else: raise Exception("Unknown truth value for input variable {}".format(output_variable)) return evaluations
[docs] def to_blif(self) -> BLIFBenchmark: self._abc_conversion("x.pla", "x.blif", "write_blif") blif_benchmark = BLIFBenchmark.read(config.abc_path.joinpath("x.blif")) abc_new_path = config.abc_path.joinpath("x.blif") os.remove(abc_new_path) return blif_benchmark
[docs] def to_pla(self) -> PLABenchmark: return self
[docs] def to_verilog(self) -> VerilogBenchmark: self._abc_conversion("x.pla", "x.v", "write_verilog") verilog_benchmark = VerilogBenchmark.read(config.abc_path.joinpath("x.v")) abc_new_path = config.abc_path.joinpath("x.v") os.remove(abc_new_path) return verilog_benchmark
[docs] class VerilogBenchmark(Benchmark): """ A class to represent a Verilog benchmark. """ def __init__(self, verilog: VerilogModule, file_path: Path = None, name: str = None): """ A Verilog benchmark has the Verilog content, and optionally a file path and name. :param verilog: The Verilog content. :param file_path: Optionally, a file path for the benchmark. :param name: Optionally, a name for the benchmark. """ super().__init__(file_path, name) self.verilog = verilog
[docs] @staticmethod def get_file_extension() -> str: return "v"
def __copy__(self): return VerilogBenchmark(self.verilog.copy(), self.file_path, self.name)
[docs] def get_input_variables(self) -> Set[str]: return self.verilog.inputs
[docs] def get_output_variables(self) -> Set[str]: return self.verilog.outputs
[docs] def get_auxiliary_variables(self) -> Set[str]: return self.verilog.wires
# TODO: Fix! This can be achieved using VerilogFix in aux folder.
[docs] def fix(self, atom: str, positive: bool) -> VerilogBenchmark: raise NotImplementedError()
[docs] def negate(self) -> VerilogBenchmark: """ Returns the negation of this benchmark. """ negated_functions = dict() for variable, formula in self.verilog.functions.items(): if variable in self.get_output_variables(): negated_formula = formula.negate() negated_functions[variable] = negated_formula else: negated_functions[variable] = formula name = self.name + "_neg" input_variables = self.get_input_variables() output_variables = self.get_output_variables() auxiliary_variables = self.get_auxiliary_variables() file_path = self.file_path verilog = VerilogModule() verilog.module_name = name verilog.inputs = input_variables verilog.wires = auxiliary_variables verilog.outputs = output_variables verilog.functions = negated_functions return VerilogBenchmark(verilog, file_path, name)
[docs] @staticmethod def from_string(content: str) -> BooleanFunction: raise NotImplementedError()
[docs] @staticmethod def read(file_path: Path) -> VerilogBenchmark: verilog_parser = VerilogParser(file_path) verilog = verilog_parser.verilog_module return VerilogBenchmark(verilog, file_path, verilog.module_name)
[docs] def eval(self, instance: Dict[str, bool]) -> Dict[str, bool]: evaluations = dict() evaluations.update(instance) graph = self._to_data_flow_graph() for node in topological_sort(graph): # If the node is a primary input variable, then we must not evaluate it. if node in self.get_input_variables(): continue else: formula = self.verilog.functions.get(node) evaluations[node] = formula.eval(evaluations).get(node) # We remove the primary input variables and the auxiliary variables from the evaluation # such that we only retain the primary output variables. for input_variable in self.get_input_variables(): evaluations.pop(input_variable) for auxiliary_variable in self.get_auxiliary_variables(): evaluations.pop(auxiliary_variable) return evaluations
def _to_data_flow_graph(self) -> DiGraph(): """ Returns a data flow graph of this Verilog benchmark. The leaf nodes are the primary input variables, and the root nodes are the output variables. """ graph = DiGraph() for output_variable, formula in self.verilog.functions.items(): graph.add_node(output_variable) for input_variable in formula.get_input_variables(): graph.add_node(input_variable) graph.add_edge(input_variable, output_variable) return graph
[docs] def to_string(self) -> str: input_variables = ", ".join(sorted(self.get_input_variables())) output_variables = ", ".join(sorted(self.get_output_variables())) content = "module {} (".format(self.verilog.module_name) content += "\n" content += "\t{}, {});".format(input_variables, output_variables) content += "\n" content += "\tinput {};".format(input_variables) content += "\n" content += "\toutput {};".format(output_variables) content += "\n" if len(self.get_auxiliary_variables()) > 0: auxiliary_variables = ", ".join(sorted(self.get_auxiliary_variables())) content += "\twire {};".format(auxiliary_variables) content += "\n" for function_name, formula in self.verilog.functions.items(): content += "\tassign {};".format(formula) content += "\n" content += "endmodule" return content
[docs] def collapse(self) -> VerilogBenchmark: data_flow_graph = self._to_data_flow_graph() # From primary input variable to output variable node_to_expression = dict() for node in topological_sort(data_flow_graph): if len(data_flow_graph.in_edges(node)) == 0: node_to_expression[node] = LITERAL(node, True) else: formula = self.verilog.functions.get(node) boolean_expression = formula.verilog.boolean_expression # print("Node: {}".format(node)) # print(boolean_expression) for (child_node, _) in data_flow_graph.in_edges(node): child_expression = node_to_expression.get(child_node) # print("\t{} -> \t{}".format(child_node, child_expression)) boolean_expression = boolean_expression.substitute(LITERAL(child_node, True), child_expression) node_to_expression[node] = boolean_expression functions = dict() for output_variable in self.get_output_variables(): verilog = VerilogFormula() verilog.output = output_variable verilog.boolean_expression = node_to_expression.get(output_variable) formula = Formula(verilog) functions[output_variable] = formula verilog_module = VerilogModule() verilog_module.module_name = self.verilog.module_name verilog_module.inputs = self.verilog.inputs verilog_module.outputs = self.verilog.outputs verilog_module.wires = set() verilog_module.functions = functions return VerilogBenchmark(verilog_module)
[docs] def to_z3(self) -> Dict[str, Bool]: verilog_benchmark = self.collapse() functions = dict() for output_variable, formula in verilog_benchmark.verilog.functions.items(): functions[output_variable] = formula.verilog.boolean_expression.to_z3() return functions
[docs] def to_blif(self, collapse: bool = True) -> BLIFBenchmark: self._abc_conversion("x.v", "x.blif", "write_blif") blif_benchmark = BLIFBenchmark.read(config.abc_path.joinpath("x.blif")) abc_new_path = config.abc_path.joinpath("x.blif") os.remove(abc_new_path) return blif_benchmark
[docs] def to_pla(self) -> PLABenchmark: self._abc_conversion("x.v", "x.pla", "write_pla") pla_benchmark = PLABenchmark.read(config.abc_path.joinpath("x.pla")) abc_new_path = config.abc_path.joinpath("x.pla") os.remove(abc_new_path) return pla_benchmark
[docs] def to_verilog(self) -> VerilogBenchmark: return self