import csv import onnx import json import pathlib import numpy as np from onnx import TensorProto _ONNX_TO_NP = { TensorProto.FLOAT: np.float32, TensorProto.DOUBLE: np.float64, TensorProto.INT64: np.int64, TensorProto.INT32: np.int32, TensorProto.UINT8: np.uint8, TensorProto.INT8: np.int8, TensorProto.BOOL: np.uint8, # store as 0/1 bytes TensorProto.FLOAT16: np.float16, # generate in f32 then cast TensorProto.BFLOAT16: getattr(np, "bfloat16", np.float32), # cast if available } def onnx_io(path): m = onnx.load(path) g = m.graph def shp(tt): s = [] if tt.HasField("shape"): for d in tt.shape.dim: s.append(int(d.dim_value) if d.HasField("dim_value") else 1) return s ins, outs = [], [] for i, v in enumerate(g.input): t = v.type.tensor_type ins.append((i, v.name, t.elem_type, shp(t))) for i, v in enumerate(g.output): t = v.type.tensor_type outs.append((i, v.name, t.elem_type, shp(t))) return ins, outs def onnx_io_bitsize(io): idx, name, elem_type, shape = io num_elements = shape[0] for dim in shape[1:]: num_elements *= dim return num_elements * _ONNX_TO_NP[elem_type]().itemsize * 8 def _dtype_bounds(np_dtype): """Return (min, max) inclusive bounds for integer dtypes; None for floats.""" if np_dtype in (np.int8, np.int16, np.int32, np.int64): info = np.iinfo(np_dtype) return int(info.min), int(info.max) if np_dtype in (np.uint8, np.uint16, np.uint32, np.uint64): info = np.iinfo(np_dtype) return int(info.min), int(info.max) return None def gen_random_inputs( onnx_inputs, *, shape_overrides: dict | None = None, float_range: tuple[float, float] = (-1.0, 1.0), int_range: tuple[int, int] = (-3, 3), dyn_dim_default: int = 1, seed: int | None = None, ): """ Generate random NumPy arrays for each ONNX input. Params ------ shape_overrides: Dict mapping input index OR input name -> tuple/list of dims. Overrides the shape inferred from the model (useful for dynamic dims). float_range: Range for floats (uniform). int_range: Range for integers (uniform integers, inclusive of low/high with np.integers semantics). dyn_dim_default: If a dim is dynamic/unknown, use this value (unless shape_overrides provides one). seed: RNG seed for reproducibility. Returns ------- inputs_list: list[np.ndarray] Arrays in graph input order (index-sorted). inputs_dict: dict[str, np.ndarray] Mapping input_name -> array in the ONNX-declared dtype. """ rng = np.random.default_rng(seed) ins = onnx_inputs # Normalize overrides to support both index and name keys. shape_overrides = shape_overrides or {} name_overrides = {k: tuple(v) for k, v in shape_overrides.items() if isinstance(k, str)} idx_overrides = {int(k): tuple(v) for k, v in shape_overrides.items() if isinstance(k, int)} arrays_by_name = {} arrays_in_order = [] for idx, name, elem_type, shape in ins: # Resolve dtype if elem_type not in _ONNX_TO_NP: raise ValueError(f"Unsupported ONNX dtype for input '{name}': {elem_type}") np_dtype = _ONNX_TO_NP[elem_type] # Resolve shape: model -> replace unknowns with dyn_dim_default -> apply overrides resolved_shape = list(shape or []) if not resolved_shape: resolved_shape = [dyn_dim_default] # scalar-like: treat as 1-dim with size dyn_dim_default # If your onnx_io already sets unknown dims to 1, we still allow overriding: if idx in idx_overrides: resolved_shape = list(idx_overrides[idx]) elif name in name_overrides: resolved_shape = list(name_overrides[name]) # Make sure no zeros resolved_shape = [int(d if d and d > 0 else dyn_dim_default) for d in resolved_shape] size = int(np.prod(resolved_shape)) # Generate data if np.issubdtype(np_dtype, np.floating): lo, hi = float_range # generate in float32/64 and cast as needed base_dtype = np.float32 if np_dtype in (np.float16, getattr(np, "bfloat16", np.float32)) else np_dtype arr = rng.uniform(lo, hi, size=size).astype(base_dtype).reshape(resolved_shape) # cast to f16/bf16 if required if np_dtype is np.float16: arr = arr.astype(np.float16) elif getattr(np, "bfloat16", None) is not None and np_dtype is np.bfloat16: arr = arr.astype(np.bfloat16) elif np_dtype == np.uint8 and elem_type == TensorProto.BOOL: # Bool as 0/1 bytes arr = (rng.random(size=size) < 0.5).astype(np.uint8).reshape(resolved_shape) elif np.issubdtype(np_dtype, np.integer): lo, hi = int_range bounds = _dtype_bounds(np_dtype) if bounds is not None: lo = max(lo, bounds[0]) hi = min(hi, bounds[1]) # np.random.integers is exclusive of high; add 1 for int range arr = rng.integers(lo, hi + 1, size=size, dtype=np_dtype).reshape(resolved_shape) else: raise ValueError(f"Unhandled dtype mapping for input '{name}' (elem_type={elem_type}).") arrays_by_name[name] = arr arrays_in_order.append(arr) return arrays_in_order, arrays_by_name def save_inputs_to_files(onnx_path, arrays_in_order, out_dir): """ Save arrays to CSV files. Returns (flags, files) where flags is a list like ["--in0-csv-file", "...", "--in0-shape", "Dx...xD", ...] and files is the list of created paths. """ out_dir = pathlib.Path(out_dir) out_dir.mkdir(parents=True, exist_ok=True) ins, _ = onnx_io(onnx_path) flags = [] files = [] for idx, _name, _et, shape in ins: arr = arrays_in_order[idx] csv_path = out_dir / f"in{idx}.csv" # Write row-major flattened values, comma-separated, with newlines allowed with open(csv_path, "w", newline="") as f: writer = csv.writer(f) # For 2D, write each row; otherwise write flattened single row for clarity if arr.ndim == 2: for r in range(arr.shape[0]): writer.writerow(arr[r].reshape(-1)) else: writer.writerow(arr.flatten()) shape_str = "x".join(str(d) for d in arr.shape) flags += [f"--in{idx}-csv-file", str(csv_path), f"--in{idx}-shape", shape_str] files.append(str(csv_path)) return flags, files def write_inputs_to_memory_bin(memory_bin_path, config_json_path, arrays_in_order): """Overwrite input regions in memory.bin at addresses from config.json.""" with open(config_json_path) as f: config = json.load(f) input_addresses = config["inputs_addresses"] assert len(input_addresses) == len(arrays_in_order), \ f"Address/input count mismatch: {len(input_addresses)} vs {len(arrays_in_order)}" with open(memory_bin_path, "r+b") as f: for addr, arr in zip(input_addresses, arrays_in_order): native = arr.astype(arr.dtype.newbyteorder("="), copy=False) f.seek(addr) f.write(native.tobytes(order="C"))