add automatic validation process wit pim simulator
This commit is contained in:
200
validation/onnx_utils.py
Normal file
200
validation/onnx_utils.py
Normal file
@@ -0,0 +1,200 @@
|
||||
import csv
|
||||
import onnx
|
||||
import json
|
||||
import pathlib
|
||||
import numpy as np
|
||||
from onnx import TensorProto
|
||||
|
||||
_ONNX_TO_NP = {
|
||||
TensorProto.FLOAT: np.float32,
|
||||
TensorProto.DOUBLE: np.float64,
|
||||
TensorProto.INT64: np.int64,
|
||||
TensorProto.INT32: np.int32,
|
||||
TensorProto.UINT8: np.uint8,
|
||||
TensorProto.INT8: np.int8,
|
||||
TensorProto.BOOL: np.uint8, # store as 0/1 bytes
|
||||
TensorProto.FLOAT16: np.float16, # generate in f32 then cast
|
||||
TensorProto.BFLOAT16: getattr(np, "bfloat16", np.float32), # cast if available
|
||||
}
|
||||
|
||||
|
||||
def onnx_io(path):
|
||||
m = onnx.load(path)
|
||||
g = m.graph
|
||||
|
||||
def shp(tt):
|
||||
s = []
|
||||
if tt.HasField("shape"):
|
||||
for d in tt.shape.dim:
|
||||
s.append(int(d.dim_value) if d.HasField("dim_value") else 1)
|
||||
return s
|
||||
|
||||
ins, outs = [], []
|
||||
for i, v in enumerate(g.input):
|
||||
t = v.type.tensor_type
|
||||
ins.append((i, v.name, t.elem_type, shp(t)))
|
||||
for i, v in enumerate(g.output):
|
||||
t = v.type.tensor_type
|
||||
outs.append((i, v.name, t.elem_type, shp(t)))
|
||||
return ins, outs
|
||||
|
||||
|
||||
def onnx_io_bitsize(io):
|
||||
idx, name, elem_type, shape = io
|
||||
num_elements = shape[0]
|
||||
for dim in shape[1:]:
|
||||
num_elements *= dim
|
||||
return num_elements * _ONNX_TO_NP[elem_type]().itemsize * 8
|
||||
|
||||
|
||||
def _dtype_bounds(np_dtype):
|
||||
"""Return (min, max) inclusive bounds for integer dtypes; None for floats."""
|
||||
if np_dtype in (np.int8, np.int16, np.int32, np.int64):
|
||||
info = np.iinfo(np_dtype)
|
||||
return int(info.min), int(info.max)
|
||||
if np_dtype in (np.uint8, np.uint16, np.uint32, np.uint64):
|
||||
info = np.iinfo(np_dtype)
|
||||
return int(info.min), int(info.max)
|
||||
return None
|
||||
|
||||
|
||||
def gen_random_inputs(
|
||||
onnx_inputs,
|
||||
*,
|
||||
shape_overrides: dict | None = None,
|
||||
float_range: tuple[float, float] = (-1.0, 1.0),
|
||||
int_range: tuple[int, int] = (-3, 3),
|
||||
dyn_dim_default: int = 1,
|
||||
seed: int | None = None,
|
||||
):
|
||||
"""
|
||||
Generate random NumPy arrays for each ONNX input.
|
||||
|
||||
Params
|
||||
------
|
||||
shape_overrides:
|
||||
Dict mapping input index OR input name -> tuple/list of dims.
|
||||
Overrides the shape inferred from the model (useful for dynamic dims).
|
||||
float_range:
|
||||
Range for floats (uniform).
|
||||
int_range:
|
||||
Range for integers (uniform integers, inclusive of low/high with np.integers semantics).
|
||||
dyn_dim_default:
|
||||
If a dim is dynamic/unknown, use this value (unless shape_overrides provides one).
|
||||
seed:
|
||||
RNG seed for reproducibility.
|
||||
|
||||
Returns
|
||||
-------
|
||||
inputs_list: list[np.ndarray]
|
||||
Arrays in graph input order (index-sorted).
|
||||
inputs_dict: dict[str, np.ndarray]
|
||||
Mapping input_name -> array in the ONNX-declared dtype.
|
||||
"""
|
||||
rng = np.random.default_rng(seed)
|
||||
ins = onnx_inputs
|
||||
|
||||
# Normalize overrides to support both index and name keys.
|
||||
shape_overrides = shape_overrides or {}
|
||||
name_overrides = {k: tuple(v) for k, v in shape_overrides.items() if isinstance(k, str)}
|
||||
idx_overrides = {int(k): tuple(v) for k, v in shape_overrides.items() if isinstance(k, int)}
|
||||
|
||||
arrays_by_name = {}
|
||||
arrays_in_order = []
|
||||
|
||||
for idx, name, elem_type, shape in ins:
|
||||
# Resolve dtype
|
||||
if elem_type not in _ONNX_TO_NP:
|
||||
raise ValueError(f"Unsupported ONNX dtype for input '{name}': {elem_type}")
|
||||
np_dtype = _ONNX_TO_NP[elem_type]
|
||||
|
||||
# Resolve shape: model -> replace unknowns with dyn_dim_default -> apply overrides
|
||||
resolved_shape = list(shape or [])
|
||||
if not resolved_shape:
|
||||
resolved_shape = [dyn_dim_default] # scalar-like: treat as 1-dim with size dyn_dim_default
|
||||
|
||||
# If your onnx_io already sets unknown dims to 1, we still allow overriding:
|
||||
if idx in idx_overrides:
|
||||
resolved_shape = list(idx_overrides[idx])
|
||||
elif name in name_overrides:
|
||||
resolved_shape = list(name_overrides[name])
|
||||
|
||||
# Make sure no zeros
|
||||
resolved_shape = [int(d if d and d > 0 else dyn_dim_default) for d in resolved_shape]
|
||||
size = int(np.prod(resolved_shape))
|
||||
|
||||
# Generate data
|
||||
if np.issubdtype(np_dtype, np.floating):
|
||||
lo, hi = float_range
|
||||
# generate in float32/64 and cast as needed
|
||||
base_dtype = np.float32 if np_dtype in (np.float16, getattr(np, "bfloat16", np.float32)) else np_dtype
|
||||
arr = rng.uniform(lo, hi, size=size).astype(base_dtype).reshape(resolved_shape)
|
||||
# cast to f16/bf16 if required
|
||||
if np_dtype is np.float16:
|
||||
arr = arr.astype(np.float16)
|
||||
elif getattr(np, "bfloat16", None) is not None and np_dtype is np.bfloat16:
|
||||
arr = arr.astype(np.bfloat16)
|
||||
elif np_dtype == np.uint8 and elem_type == TensorProto.BOOL:
|
||||
# Bool as 0/1 bytes
|
||||
arr = (rng.random(size=size) < 0.5).astype(np.uint8).reshape(resolved_shape)
|
||||
elif np.issubdtype(np_dtype, np.integer):
|
||||
lo, hi = int_range
|
||||
bounds = _dtype_bounds(np_dtype)
|
||||
if bounds is not None:
|
||||
lo = max(lo, bounds[0])
|
||||
hi = min(hi, bounds[1])
|
||||
# np.random.integers is exclusive of high; add 1 for int range
|
||||
arr = rng.integers(lo, hi + 1, size=size, dtype=np_dtype).reshape(resolved_shape)
|
||||
else:
|
||||
raise ValueError(f"Unhandled dtype mapping for input '{name}' (elem_type={elem_type}).")
|
||||
|
||||
arrays_by_name[name] = arr
|
||||
arrays_in_order.append(arr)
|
||||
return arrays_in_order, arrays_by_name
|
||||
|
||||
|
||||
def save_inputs_to_files(onnx_path, arrays_in_order, out_dir):
|
||||
"""
|
||||
Save arrays to CSV files. Returns (flags, files) where flags is a list
|
||||
like ["--in0-csv-file", "...", "--in0-shape", "Dx...xD", ...]
|
||||
and files is the list of created paths.
|
||||
"""
|
||||
out_dir = pathlib.Path(out_dir)
|
||||
out_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
ins, _ = onnx_io(onnx_path)
|
||||
flags = []
|
||||
files = []
|
||||
for idx, _name, _et, shape in ins:
|
||||
arr = arrays_in_order[idx]
|
||||
csv_path = out_dir / f"in{idx}.csv"
|
||||
# Write row-major flattened values, comma-separated, with newlines allowed
|
||||
with open(csv_path, "w", newline="") as f:
|
||||
writer = csv.writer(f)
|
||||
# For 2D, write each row; otherwise write flattened single row for clarity
|
||||
if arr.ndim == 2:
|
||||
for r in range(arr.shape[0]):
|
||||
writer.writerow(arr[r].reshape(-1))
|
||||
else:
|
||||
writer.writerow(arr.flatten())
|
||||
|
||||
shape_str = "x".join(str(d) for d in arr.shape)
|
||||
flags += [f"--in{idx}-csv-file", str(csv_path), f"--in{idx}-shape", shape_str]
|
||||
files.append(str(csv_path))
|
||||
return flags, files
|
||||
|
||||
|
||||
def write_inputs_to_memory_bin(memory_bin_path, config_json_path, arrays_in_order):
|
||||
"""Overwrite input regions in memory.bin at addresses from config.json."""
|
||||
with open(config_json_path) as f:
|
||||
config = json.load(f)
|
||||
|
||||
input_addresses = config["inputs_addresses"]
|
||||
assert len(input_addresses) == len(arrays_in_order), \
|
||||
f"Address/input count mismatch: {len(input_addresses)} vs {len(arrays_in_order)}"
|
||||
|
||||
with open(memory_bin_path, "r+b") as f:
|
||||
for addr, arr in zip(input_addresses, arrays_in_order):
|
||||
native = arr.astype(arr.dtype.newbyteorder("="), copy=False)
|
||||
f.seek(addr)
|
||||
f.write(native.tobytes(order="C"))
|
||||
Reference in New Issue
Block a user