296 lines
11 KiB
Python
296 lines
11 KiB
Python
#!/usr/bin/env python3.13
|
|
|
|
import argparse
|
|
import math
|
|
import subprocess
|
|
import sys
|
|
from pathlib import Path
|
|
|
|
import numpy as np
|
|
from PIL import Image, ImageDraw
|
|
|
|
SCRIPT_DIR = Path(__file__).resolve().parent
|
|
VALIDATION_DIR = SCRIPT_DIR.parent
|
|
REPO_ROOT = VALIDATION_DIR.parent
|
|
if str(VALIDATION_DIR) not in sys.path:
|
|
sys.path.insert(0, str(VALIDATION_DIR))
|
|
|
|
from onnx_utils import _ONNX_TO_NP, onnx_io, write_inputs_to_memory_bin
|
|
from validate_one import (
|
|
MODE_COMPILE_ONLY,
|
|
build_dump_ranges,
|
|
parse_pim_simulator_outputs,
|
|
run_pim_simulator,
|
|
sanitize_output_name,
|
|
validate_network,
|
|
)
|
|
from yolo_real_image_validation import save_tensor_csv
|
|
|
|
IMAGENET_MEAN = np.asarray([0.485, 0.456, 0.406], dtype=np.float32)
|
|
IMAGENET_STD = np.asarray([0.229, 0.224, 0.225], dtype=np.float32)
|
|
DEFAULT_VGG_MODEL = VALIDATION_DIR / "networks" / "vgg16" / "depth_35" / "vgg16_depth_35.onnx"
|
|
DEFAULT_RESNET_MODEL = VALIDATION_DIR / "networks" / "resnet" / "resnet18_torchvision.onnx"
|
|
|
|
|
|
def resolve_default_paths():
|
|
return {
|
|
"raptor_path": REPO_ROOT / "build_release" / "Release" / "bin" / "onnx-mlir",
|
|
"onnx_include_dir": REPO_ROOT / "onnx-mlir" / "include",
|
|
"simulator_dir": REPO_ROOT / "backend-simulators" / "pim" / "pim-simulator",
|
|
}
|
|
|
|
|
|
def resolve_model_path(network: str | None, model: Path | None) -> Path:
|
|
if model is not None:
|
|
return model.resolve()
|
|
if network == "resnet":
|
|
return DEFAULT_RESNET_MODEL.resolve()
|
|
if network == "vgg":
|
|
return DEFAULT_VGG_MODEL.resolve()
|
|
raise SystemExit("Pass --model or select a default with --network {resnet,vgg}.")
|
|
|
|
|
|
def ensure_local_artifacts(args, model_path: Path):
|
|
validate_network(
|
|
network_onnx_path=model_path,
|
|
raptor_path=args.raptor_path,
|
|
onnx_include_dir=args.onnx_include_dir,
|
|
simulator_dir=args.simulator_dir,
|
|
crossbar_size=args.crossbar_size,
|
|
crossbar_count=args.crossbar_count,
|
|
core_count=args.core_count,
|
|
command_timeout_seconds=args.command_timeout_seconds,
|
|
mode=MODE_COMPILE_ONLY,
|
|
verbose=args.verbose,
|
|
)
|
|
|
|
|
|
def ensure_existing_artifacts(model_dir: Path):
|
|
required_paths = [
|
|
model_dir / "runner" / "build" / "runner",
|
|
model_dir / "raptor" / "pim" / "config.json",
|
|
model_dir / "raptor" / "pim" / "memory.bin",
|
|
]
|
|
missing = [str(path) for path in required_paths if not path.exists()]
|
|
if missing:
|
|
raise FileNotFoundError(
|
|
"Missing compiled local artifacts. Re-run without --skip-compile or restore these paths:\n "
|
|
+ "\n ".join(missing)
|
|
)
|
|
|
|
|
|
def preprocess_classification_image(image_path: Path) -> tuple[Image.Image, np.ndarray]:
|
|
image = Image.open(image_path).convert("RGB")
|
|
width, height = image.size
|
|
scale = 256.0 / min(width, height)
|
|
resized_size = (
|
|
max(1, int(round(width * scale))),
|
|
max(1, int(round(height * scale))),
|
|
)
|
|
resized = image.resize(resized_size, Image.Resampling.BILINEAR)
|
|
|
|
left = (resized.width - 224) // 2
|
|
top = (resized.height - 224) // 2
|
|
cropped = resized.crop((left, top, left + 224, top + 224))
|
|
|
|
array = np.asarray(cropped, dtype=np.float32) / 255.0
|
|
array = (array - IMAGENET_MEAN) / IMAGENET_STD
|
|
chw = np.transpose(array, (2, 0, 1))
|
|
tensor = np.expand_dims(chw.astype(np.float32, copy=False), axis=0)
|
|
return image, tensor
|
|
|
|
|
|
def load_labels(labels_path: Path | None) -> list[str] | None:
|
|
if labels_path is None:
|
|
return None
|
|
labels = [line.strip() for line in labels_path.read_text().splitlines()]
|
|
return labels or None
|
|
|
|
|
|
def softmax(values: np.ndarray) -> np.ndarray:
|
|
shifted = values - np.max(values)
|
|
exp = np.exp(shifted)
|
|
denom = exp.sum()
|
|
if not math.isfinite(float(denom)) or denom <= 0.0:
|
|
raise RuntimeError("Softmax received non-finite output scores.")
|
|
return exp / denom
|
|
|
|
|
|
def decode_classification_output(output: np.ndarray, labels: list[str] | None, top_k: int):
|
|
scores = np.asarray(output, dtype=np.float64).reshape(-1)
|
|
probabilities = softmax(scores)
|
|
limit = min(top_k, probabilities.size)
|
|
top_indices = np.argsort(probabilities)[-limit:][::-1]
|
|
results = []
|
|
for index in top_indices:
|
|
label = None
|
|
if labels is not None and 0 <= int(index) < len(labels):
|
|
label = labels[int(index)]
|
|
results.append(
|
|
{
|
|
"index": int(index),
|
|
"label": label,
|
|
"probability": float(probabilities[int(index)]),
|
|
}
|
|
)
|
|
return results
|
|
|
|
|
|
def render_result_line(result) -> str:
|
|
name = result["label"] if result["label"] else f'class {result["index"]}'
|
|
return f'{name}: {result["probability"] * 100.0:.2f}%'
|
|
|
|
|
|
def draw_classification_panel(image: Image.Image, results, output_path: Path):
|
|
annotated = image.copy()
|
|
draw = ImageDraw.Draw(annotated)
|
|
lines = [render_result_line(result) for result in results]
|
|
if not lines:
|
|
lines = ["No predictions"]
|
|
|
|
padding = 10
|
|
line_gap = 4
|
|
max_width = 0
|
|
line_heights = []
|
|
for line in lines:
|
|
left, top, right, bottom = draw.textbbox((0, 0), line)
|
|
max_width = max(max_width, right - left)
|
|
line_heights.append(bottom - top)
|
|
|
|
panel_height = padding * 2 + sum(line_heights) + line_gap * (len(lines) - 1)
|
|
panel_width = padding * 2 + max_width
|
|
origin_x = 12
|
|
origin_y = 12
|
|
draw.rounded_rectangle(
|
|
(origin_x, origin_y, origin_x + panel_width, origin_y + panel_height),
|
|
radius=10,
|
|
fill=(0, 0, 0),
|
|
)
|
|
|
|
y = origin_y + padding
|
|
for line, line_height in zip(lines, line_heights):
|
|
draw.text((origin_x + padding, y), line, fill=(255, 255, 255))
|
|
y += line_height + line_gap
|
|
|
|
annotated.save(output_path)
|
|
|
|
|
|
def run_reference_and_simulator(args, model_path: Path, tensor: np.ndarray):
|
|
model_dir = model_path.parent
|
|
runner_build_dir = model_dir / "runner" / "build"
|
|
runner_path = runner_build_dir / "runner"
|
|
pim_dir = model_dir / "raptor" / "pim"
|
|
simulation_dir = model_dir / "classification_demo" / "simulation"
|
|
reference_dir = model_dir / "classification_demo" / "reference"
|
|
inputs_dir = model_dir / "classification_demo" / "inputs"
|
|
|
|
simulation_dir.mkdir(parents=True, exist_ok=True)
|
|
reference_dir.mkdir(parents=True, exist_ok=True)
|
|
inputs_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
input_descriptors, output_descriptors = onnx_io(model_path)
|
|
if len(input_descriptors) != 1:
|
|
raise RuntimeError(f"Expected one classification input tensor, found {len(input_descriptors)}")
|
|
if len(output_descriptors) != 1:
|
|
raise RuntimeError(f"Expected one classification output tensor, found {len(output_descriptors)}")
|
|
|
|
input_index, _input_name, _input_dtype, input_shape = input_descriptors[0]
|
|
if list(tensor.shape) != list(input_shape):
|
|
raise RuntimeError(f"Preprocessed tensor shape {list(tensor.shape)} does not match model input {input_shape}")
|
|
|
|
input_csv = inputs_dir / "in0.csv"
|
|
save_tensor_csv(tensor, input_csv)
|
|
|
|
runner_cmd = [
|
|
str(runner_path),
|
|
f"--in{input_index}-csv-file",
|
|
str(input_csv),
|
|
f"--in{input_index}-shape",
|
|
"x".join(str(dim) for dim in tensor.shape),
|
|
"--save-csv-dir",
|
|
str(reference_dir),
|
|
]
|
|
subprocess.run(runner_cmd, cwd=runner_build_dir, check=True)
|
|
|
|
write_inputs_to_memory_bin(pim_dir / "memory.bin", pim_dir / "config.json", [tensor])
|
|
dump_ranges = build_dump_ranges(pim_dir / "config.json", output_descriptors)
|
|
output_bin_path = simulation_dir / "out.bin"
|
|
run_pim_simulator(
|
|
args.simulator_dir,
|
|
pim_dir,
|
|
output_bin_path,
|
|
dump_ranges,
|
|
timeout_sec=args.command_timeout_seconds,
|
|
)
|
|
|
|
output_index, output_name, output_dtype_code, output_shape = output_descriptors[0]
|
|
output_dtype = np.dtype(_ONNX_TO_NP[output_dtype_code])
|
|
reference_csv = reference_dir / f"output{output_index}_{sanitize_output_name(output_name)}.csv"
|
|
reference_output = np.loadtxt(reference_csv, delimiter=",", dtype=output_dtype).reshape(output_shape)
|
|
simulator_output = parse_pim_simulator_outputs(output_bin_path, output_descriptors)[0]
|
|
return reference_output, simulator_output
|
|
|
|
|
|
def print_topk(title: str, results):
|
|
print(title)
|
|
for rank, result in enumerate(results, start=1):
|
|
label_text = result["label"] if result["label"] else f'class {result["index"]}'
|
|
print(f' {rank}. {label_text} ({result["probability"] * 100.0:.2f}%) [index={result["index"]}]')
|
|
|
|
|
|
def main():
|
|
defaults = resolve_default_paths()
|
|
|
|
parser = argparse.ArgumentParser(description="Run a VGG or ResNet ONNX model through the Raptor simulator and annotate the image with top classification results.")
|
|
parser.add_argument("--model", type=Path, default=None)
|
|
parser.add_argument("--network", choices=("resnet", "vgg"), default=None)
|
|
parser.add_argument("--image", type=Path, required=True)
|
|
parser.add_argument("--labels", type=Path, default=None)
|
|
parser.add_argument("--output", type=Path, required=True)
|
|
parser.add_argument("--raptor-path", type=Path, default=defaults["raptor_path"])
|
|
parser.add_argument("--onnx-include-dir", type=Path, default=defaults["onnx_include_dir"])
|
|
parser.add_argument("--simulator-dir", type=Path, default=defaults["simulator_dir"])
|
|
parser.add_argument("--crossbar-size", type=int, default=2048)
|
|
parser.add_argument("--crossbar-count", type=int, default=256)
|
|
parser.add_argument("--core-count", type=int, default=1000)
|
|
parser.add_argument("--top-k", type=int, default=5)
|
|
parser.add_argument("--command-timeout-seconds", type=float, default=7200.0)
|
|
parser.add_argument("--skip-compile", action="store_true")
|
|
parser.add_argument("--verbose", action="store_true")
|
|
args = parser.parse_args()
|
|
|
|
args.model = resolve_model_path(args.network, args.model)
|
|
args.image = args.image.resolve()
|
|
args.output = args.output.resolve()
|
|
args.labels = args.labels.resolve() if args.labels else None
|
|
args.raptor_path = args.raptor_path.resolve()
|
|
args.onnx_include_dir = args.onnx_include_dir.resolve()
|
|
args.simulator_dir = args.simulator_dir.resolve()
|
|
|
|
if not args.skip_compile:
|
|
ensure_local_artifacts(args, args.model)
|
|
else:
|
|
ensure_existing_artifacts(args.model.parent)
|
|
|
|
original_image, tensor = preprocess_classification_image(args.image)
|
|
labels = load_labels(args.labels)
|
|
reference_output, simulator_output = run_reference_and_simulator(args, args.model, tensor)
|
|
reference_results = decode_classification_output(reference_output, labels, args.top_k)
|
|
simulator_results = decode_classification_output(simulator_output, labels, args.top_k)
|
|
|
|
print_topk("Reference top-k:", reference_results)
|
|
print_topk("Simulator top-k:", simulator_results)
|
|
|
|
reference_scores = np.asarray(reference_output, dtype=np.float64).reshape(-1)
|
|
simulator_scores = np.asarray(simulator_output, dtype=np.float64).reshape(-1)
|
|
max_abs_diff = float(np.max(np.abs(reference_scores - simulator_scores)))
|
|
print(f"Max absolute score diff: {max_abs_diff:.6e}")
|
|
|
|
args.output.parent.mkdir(parents=True, exist_ok=True)
|
|
draw_classification_panel(original_image, simulator_results, args.output)
|
|
print(f"Annotated image saved to {args.output}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|