#!/usr/bin/env python3.13 import argparse import math import subprocess import sys from pathlib import Path import numpy as np from PIL import Image, ImageDraw SCRIPT_DIR = Path(__file__).resolve().parent VALIDATION_DIR = SCRIPT_DIR.parent REPO_ROOT = VALIDATION_DIR.parent if str(VALIDATION_DIR) not in sys.path: sys.path.insert(0, str(VALIDATION_DIR)) from onnx_utils import _ONNX_TO_NP, onnx_io, write_inputs_to_memory_bin from validate_one import ( MODE_COMPILE_ONLY, build_dump_ranges, parse_pim_simulator_outputs, run_pim_simulator, sanitize_output_name, validate_network, ) from yolo_real_image_validation import save_tensor_csv IMAGENET_MEAN = np.asarray([0.485, 0.456, 0.406], dtype=np.float32) IMAGENET_STD = np.asarray([0.229, 0.224, 0.225], dtype=np.float32) DEFAULT_VGG_MODEL = VALIDATION_DIR / "networks" / "vgg16" / "depth_35" / "vgg16_depth_35.onnx" DEFAULT_RESNET_MODEL = VALIDATION_DIR / "networks" / "resnet" / "resnet18_torchvision.onnx" def resolve_default_paths(): return { "raptor_path": REPO_ROOT / "build_release" / "Release" / "bin" / "onnx-mlir", "onnx_include_dir": REPO_ROOT / "onnx-mlir" / "include", "simulator_dir": REPO_ROOT / "backend-simulators" / "pim" / "pim-simulator", } def resolve_model_path(network: str | None, model: Path | None) -> Path: if model is not None: return model.resolve() if network == "resnet": return DEFAULT_RESNET_MODEL.resolve() if network == "vgg": return DEFAULT_VGG_MODEL.resolve() raise SystemExit("Pass --model or select a default with --network {resnet,vgg}.") def ensure_local_artifacts(args, model_path: Path): validate_network( network_onnx_path=model_path, raptor_path=args.raptor_path, onnx_include_dir=args.onnx_include_dir, simulator_dir=args.simulator_dir, crossbar_size=args.crossbar_size, crossbar_count=args.crossbar_count, core_count=args.core_count, command_timeout_seconds=args.command_timeout_seconds, mode=MODE_COMPILE_ONLY, verbose=args.verbose, ) def ensure_existing_artifacts(model_dir: Path): required_paths = [ model_dir / "runner" / "build" / "runner", model_dir / "raptor" / "pim" / "config.json", model_dir / "raptor" / "pim" / "memory.bin", ] missing = [str(path) for path in required_paths if not path.exists()] if missing: raise FileNotFoundError( "Missing compiled local artifacts. Re-run without --skip-compile or restore these paths:\n " + "\n ".join(missing) ) def preprocess_classification_image(image_path: Path) -> tuple[Image.Image, np.ndarray]: image = Image.open(image_path).convert("RGB") width, height = image.size scale = 256.0 / min(width, height) resized_size = ( max(1, int(round(width * scale))), max(1, int(round(height * scale))), ) resized = image.resize(resized_size, Image.Resampling.BILINEAR) left = (resized.width - 224) // 2 top = (resized.height - 224) // 2 cropped = resized.crop((left, top, left + 224, top + 224)) array = np.asarray(cropped, dtype=np.float32) / 255.0 array = (array - IMAGENET_MEAN) / IMAGENET_STD chw = np.transpose(array, (2, 0, 1)) tensor = np.expand_dims(chw.astype(np.float32, copy=False), axis=0) return image, tensor def load_labels(labels_path: Path | None) -> list[str] | None: if labels_path is None: return None labels = [line.strip() for line in labels_path.read_text().splitlines()] return labels or None def softmax(values: np.ndarray) -> np.ndarray: shifted = values - np.max(values) exp = np.exp(shifted) denom = exp.sum() if not math.isfinite(float(denom)) or denom <= 0.0: raise RuntimeError("Softmax received non-finite output scores.") return exp / denom def decode_classification_output(output: np.ndarray, labels: list[str] | None, top_k: int): scores = np.asarray(output, dtype=np.float64).reshape(-1) probabilities = softmax(scores) limit = min(top_k, probabilities.size) top_indices = np.argsort(probabilities)[-limit:][::-1] results = [] for index in top_indices: label = None if labels is not None and 0 <= int(index) < len(labels): label = labels[int(index)] results.append( { "index": int(index), "label": label, "probability": float(probabilities[int(index)]), } ) return results def render_result_line(result) -> str: name = result["label"] if result["label"] else f'class {result["index"]}' return f'{name}: {result["probability"] * 100.0:.2f}%' def draw_classification_panel(image: Image.Image, results, output_path: Path): annotated = image.copy() draw = ImageDraw.Draw(annotated) lines = [render_result_line(result) for result in results] if not lines: lines = ["No predictions"] padding = 10 line_gap = 4 max_width = 0 line_heights = [] for line in lines: left, top, right, bottom = draw.textbbox((0, 0), line) max_width = max(max_width, right - left) line_heights.append(bottom - top) panel_height = padding * 2 + sum(line_heights) + line_gap * (len(lines) - 1) panel_width = padding * 2 + max_width origin_x = 12 origin_y = 12 draw.rounded_rectangle( (origin_x, origin_y, origin_x + panel_width, origin_y + panel_height), radius=10, fill=(0, 0, 0), ) y = origin_y + padding for line, line_height in zip(lines, line_heights): draw.text((origin_x + padding, y), line, fill=(255, 255, 255)) y += line_height + line_gap annotated.save(output_path) def run_reference_and_simulator(args, model_path: Path, tensor: np.ndarray): model_dir = model_path.parent runner_build_dir = model_dir / "runner" / "build" runner_path = runner_build_dir / "runner" pim_dir = model_dir / "raptor" / "pim" simulation_dir = model_dir / "classification_demo" / "simulation" reference_dir = model_dir / "classification_demo" / "reference" inputs_dir = model_dir / "classification_demo" / "inputs" simulation_dir.mkdir(parents=True, exist_ok=True) reference_dir.mkdir(parents=True, exist_ok=True) inputs_dir.mkdir(parents=True, exist_ok=True) input_descriptors, output_descriptors = onnx_io(model_path) if len(input_descriptors) != 1: raise RuntimeError(f"Expected one classification input tensor, found {len(input_descriptors)}") if len(output_descriptors) != 1: raise RuntimeError(f"Expected one classification output tensor, found {len(output_descriptors)}") input_index, _input_name, _input_dtype, input_shape = input_descriptors[0] if list(tensor.shape) != list(input_shape): raise RuntimeError(f"Preprocessed tensor shape {list(tensor.shape)} does not match model input {input_shape}") input_csv = inputs_dir / "in0.csv" save_tensor_csv(tensor, input_csv) runner_cmd = [ str(runner_path), f"--in{input_index}-csv-file", str(input_csv), f"--in{input_index}-shape", "x".join(str(dim) for dim in tensor.shape), "--save-csv-dir", str(reference_dir), ] subprocess.run(runner_cmd, cwd=runner_build_dir, check=True) write_inputs_to_memory_bin(pim_dir / "memory.bin", pim_dir / "config.json", [tensor]) dump_ranges = build_dump_ranges(pim_dir / "config.json", output_descriptors) output_bin_path = simulation_dir / "out.bin" run_pim_simulator( args.simulator_dir, pim_dir, output_bin_path, dump_ranges, timeout_sec=args.command_timeout_seconds, ) output_index, output_name, output_dtype_code, output_shape = output_descriptors[0] output_dtype = np.dtype(_ONNX_TO_NP[output_dtype_code]) reference_csv = reference_dir / f"output{output_index}_{sanitize_output_name(output_name)}.csv" reference_output = np.loadtxt(reference_csv, delimiter=",", dtype=output_dtype).reshape(output_shape) simulator_output = parse_pim_simulator_outputs(output_bin_path, output_descriptors)[0] return reference_output, simulator_output def print_topk(title: str, results): print(title) for rank, result in enumerate(results, start=1): label_text = result["label"] if result["label"] else f'class {result["index"]}' print(f' {rank}. {label_text} ({result["probability"] * 100.0:.2f}%) [index={result["index"]}]') def main(): defaults = resolve_default_paths() parser = argparse.ArgumentParser(description="Run a VGG or ResNet ONNX model through the Raptor simulator and annotate the image with top classification results.") parser.add_argument("--model", type=Path, default=None) parser.add_argument("--network", choices=("resnet", "vgg"), default=None) parser.add_argument("--image", type=Path, required=True) parser.add_argument("--labels", type=Path, default=None) parser.add_argument("--output", type=Path, required=True) parser.add_argument("--raptor-path", type=Path, default=defaults["raptor_path"]) parser.add_argument("--onnx-include-dir", type=Path, default=defaults["onnx_include_dir"]) parser.add_argument("--simulator-dir", type=Path, default=defaults["simulator_dir"]) parser.add_argument("--crossbar-size", type=int, default=2048) parser.add_argument("--crossbar-count", type=int, default=256) parser.add_argument("--core-count", type=int, default=1000) parser.add_argument("--top-k", type=int, default=5) parser.add_argument("--command-timeout-seconds", type=float, default=7200.0) parser.add_argument("--skip-compile", action="store_true") parser.add_argument("--verbose", action="store_true") args = parser.parse_args() args.model = resolve_model_path(args.network, args.model) args.image = args.image.resolve() args.output = args.output.resolve() args.labels = args.labels.resolve() if args.labels else None args.raptor_path = args.raptor_path.resolve() args.onnx_include_dir = args.onnx_include_dir.resolve() args.simulator_dir = args.simulator_dir.resolve() if not args.skip_compile: ensure_local_artifacts(args, args.model) else: ensure_existing_artifacts(args.model.parent) original_image, tensor = preprocess_classification_image(args.image) labels = load_labels(args.labels) reference_output, simulator_output = run_reference_and_simulator(args, args.model, tensor) reference_results = decode_classification_output(reference_output, labels, args.top_k) simulator_results = decode_classification_output(simulator_output, labels, args.top_k) print_topk("Reference top-k:", reference_results) print_topk("Simulator top-k:", simulator_results) reference_scores = np.asarray(reference_output, dtype=np.float64).reshape(-1) simulator_scores = np.asarray(simulator_output, dtype=np.float64).reshape(-1) max_abs_diff = float(np.max(np.abs(reference_scores - simulator_scores))) print(f"Max absolute score diff: {max_abs_diff:.6e}") args.output.parent.mkdir(parents=True, exist_ok=True) draw_classification_panel(original_image, simulator_results, args.output) print(f"Annotated image saved to {args.output}") if __name__ == "__main__": main()