This commit is contained in:
@@ -0,0 +1,295 @@
|
||||
#!/usr/bin/env python3.13
|
||||
|
||||
import argparse
|
||||
import math
|
||||
import subprocess
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
import numpy as np
|
||||
from PIL import Image, ImageDraw
|
||||
|
||||
SCRIPT_DIR = Path(__file__).resolve().parent
|
||||
VALIDATION_DIR = SCRIPT_DIR.parent
|
||||
REPO_ROOT = VALIDATION_DIR.parent
|
||||
if str(VALIDATION_DIR) not in sys.path:
|
||||
sys.path.insert(0, str(VALIDATION_DIR))
|
||||
|
||||
from onnx_utils import _ONNX_TO_NP, onnx_io, write_inputs_to_memory_bin
|
||||
from validate_one import (
|
||||
MODE_COMPILE_ONLY,
|
||||
build_dump_ranges,
|
||||
parse_pim_simulator_outputs,
|
||||
run_pim_simulator,
|
||||
sanitize_output_name,
|
||||
validate_network,
|
||||
)
|
||||
from yolo_real_image_validation import save_tensor_csv
|
||||
|
||||
IMAGENET_MEAN = np.asarray([0.485, 0.456, 0.406], dtype=np.float32)
|
||||
IMAGENET_STD = np.asarray([0.229, 0.224, 0.225], dtype=np.float32)
|
||||
DEFAULT_VGG_MODEL = VALIDATION_DIR / "networks" / "vgg16" / "depth_35" / "vgg16_depth_35.onnx"
|
||||
DEFAULT_RESNET_MODEL = VALIDATION_DIR / "networks" / "resnet" / "resnet18_torchvision.onnx"
|
||||
|
||||
|
||||
def resolve_default_paths():
|
||||
return {
|
||||
"raptor_path": REPO_ROOT / "build_release" / "Release" / "bin" / "onnx-mlir",
|
||||
"onnx_include_dir": REPO_ROOT / "onnx-mlir" / "include",
|
||||
"simulator_dir": REPO_ROOT / "backend-simulators" / "pim" / "pim-simulator",
|
||||
}
|
||||
|
||||
|
||||
def resolve_model_path(network: str | None, model: Path | None) -> Path:
|
||||
if model is not None:
|
||||
return model.resolve()
|
||||
if network == "resnet":
|
||||
return DEFAULT_RESNET_MODEL.resolve()
|
||||
if network == "vgg":
|
||||
return DEFAULT_VGG_MODEL.resolve()
|
||||
raise SystemExit("Pass --model or select a default with --network {resnet,vgg}.")
|
||||
|
||||
|
||||
def ensure_local_artifacts(args, model_path: Path):
|
||||
validate_network(
|
||||
network_onnx_path=model_path,
|
||||
raptor_path=args.raptor_path,
|
||||
onnx_include_dir=args.onnx_include_dir,
|
||||
simulator_dir=args.simulator_dir,
|
||||
crossbar_size=args.crossbar_size,
|
||||
crossbar_count=args.crossbar_count,
|
||||
core_count=args.core_count,
|
||||
command_timeout_seconds=args.command_timeout_seconds,
|
||||
mode=MODE_COMPILE_ONLY,
|
||||
verbose=args.verbose,
|
||||
)
|
||||
|
||||
|
||||
def ensure_existing_artifacts(model_dir: Path):
|
||||
required_paths = [
|
||||
model_dir / "runner" / "build" / "runner",
|
||||
model_dir / "raptor" / "pim" / "config.json",
|
||||
model_dir / "raptor" / "pim" / "memory.bin",
|
||||
]
|
||||
missing = [str(path) for path in required_paths if not path.exists()]
|
||||
if missing:
|
||||
raise FileNotFoundError(
|
||||
"Missing compiled local artifacts. Re-run without --skip-compile or restore these paths:\n "
|
||||
+ "\n ".join(missing)
|
||||
)
|
||||
|
||||
|
||||
def preprocess_classification_image(image_path: Path) -> tuple[Image.Image, np.ndarray]:
|
||||
image = Image.open(image_path).convert("RGB")
|
||||
width, height = image.size
|
||||
scale = 256.0 / min(width, height)
|
||||
resized_size = (
|
||||
max(1, int(round(width * scale))),
|
||||
max(1, int(round(height * scale))),
|
||||
)
|
||||
resized = image.resize(resized_size, Image.Resampling.BILINEAR)
|
||||
|
||||
left = (resized.width - 224) // 2
|
||||
top = (resized.height - 224) // 2
|
||||
cropped = resized.crop((left, top, left + 224, top + 224))
|
||||
|
||||
array = np.asarray(cropped, dtype=np.float32) / 255.0
|
||||
array = (array - IMAGENET_MEAN) / IMAGENET_STD
|
||||
chw = np.transpose(array, (2, 0, 1))
|
||||
tensor = np.expand_dims(chw.astype(np.float32, copy=False), axis=0)
|
||||
return image, tensor
|
||||
|
||||
|
||||
def load_labels(labels_path: Path | None) -> list[str] | None:
|
||||
if labels_path is None:
|
||||
return None
|
||||
labels = [line.strip() for line in labels_path.read_text().splitlines()]
|
||||
return labels or None
|
||||
|
||||
|
||||
def softmax(values: np.ndarray) -> np.ndarray:
|
||||
shifted = values - np.max(values)
|
||||
exp = np.exp(shifted)
|
||||
denom = exp.sum()
|
||||
if not math.isfinite(float(denom)) or denom <= 0.0:
|
||||
raise RuntimeError("Softmax received non-finite output scores.")
|
||||
return exp / denom
|
||||
|
||||
|
||||
def decode_classification_output(output: np.ndarray, labels: list[str] | None, top_k: int):
|
||||
scores = np.asarray(output, dtype=np.float64).reshape(-1)
|
||||
probabilities = softmax(scores)
|
||||
limit = min(top_k, probabilities.size)
|
||||
top_indices = np.argsort(probabilities)[-limit:][::-1]
|
||||
results = []
|
||||
for index in top_indices:
|
||||
label = None
|
||||
if labels is not None and 0 <= int(index) < len(labels):
|
||||
label = labels[int(index)]
|
||||
results.append(
|
||||
{
|
||||
"index": int(index),
|
||||
"label": label,
|
||||
"probability": float(probabilities[int(index)]),
|
||||
}
|
||||
)
|
||||
return results
|
||||
|
||||
|
||||
def render_result_line(result) -> str:
|
||||
name = result["label"] if result["label"] else f'class {result["index"]}'
|
||||
return f'{name}: {result["probability"] * 100.0:.2f}%'
|
||||
|
||||
|
||||
def draw_classification_panel(image: Image.Image, results, output_path: Path):
|
||||
annotated = image.copy()
|
||||
draw = ImageDraw.Draw(annotated)
|
||||
lines = [render_result_line(result) for result in results]
|
||||
if not lines:
|
||||
lines = ["No predictions"]
|
||||
|
||||
padding = 10
|
||||
line_gap = 4
|
||||
max_width = 0
|
||||
line_heights = []
|
||||
for line in lines:
|
||||
left, top, right, bottom = draw.textbbox((0, 0), line)
|
||||
max_width = max(max_width, right - left)
|
||||
line_heights.append(bottom - top)
|
||||
|
||||
panel_height = padding * 2 + sum(line_heights) + line_gap * (len(lines) - 1)
|
||||
panel_width = padding * 2 + max_width
|
||||
origin_x = 12
|
||||
origin_y = 12
|
||||
draw.rounded_rectangle(
|
||||
(origin_x, origin_y, origin_x + panel_width, origin_y + panel_height),
|
||||
radius=10,
|
||||
fill=(0, 0, 0),
|
||||
)
|
||||
|
||||
y = origin_y + padding
|
||||
for line, line_height in zip(lines, line_heights):
|
||||
draw.text((origin_x + padding, y), line, fill=(255, 255, 255))
|
||||
y += line_height + line_gap
|
||||
|
||||
annotated.save(output_path)
|
||||
|
||||
|
||||
def run_reference_and_simulator(args, model_path: Path, tensor: np.ndarray):
|
||||
model_dir = model_path.parent
|
||||
runner_build_dir = model_dir / "runner" / "build"
|
||||
runner_path = runner_build_dir / "runner"
|
||||
pim_dir = model_dir / "raptor" / "pim"
|
||||
simulation_dir = model_dir / "classification_demo" / "simulation"
|
||||
reference_dir = model_dir / "classification_demo" / "reference"
|
||||
inputs_dir = model_dir / "classification_demo" / "inputs"
|
||||
|
||||
simulation_dir.mkdir(parents=True, exist_ok=True)
|
||||
reference_dir.mkdir(parents=True, exist_ok=True)
|
||||
inputs_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
input_descriptors, output_descriptors = onnx_io(model_path)
|
||||
if len(input_descriptors) != 1:
|
||||
raise RuntimeError(f"Expected one classification input tensor, found {len(input_descriptors)}")
|
||||
if len(output_descriptors) != 1:
|
||||
raise RuntimeError(f"Expected one classification output tensor, found {len(output_descriptors)}")
|
||||
|
||||
input_index, _input_name, _input_dtype, input_shape = input_descriptors[0]
|
||||
if list(tensor.shape) != list(input_shape):
|
||||
raise RuntimeError(f"Preprocessed tensor shape {list(tensor.shape)} does not match model input {input_shape}")
|
||||
|
||||
input_csv = inputs_dir / "in0.csv"
|
||||
save_tensor_csv(tensor, input_csv)
|
||||
|
||||
runner_cmd = [
|
||||
str(runner_path),
|
||||
f"--in{input_index}-csv-file",
|
||||
str(input_csv),
|
||||
f"--in{input_index}-shape",
|
||||
"x".join(str(dim) for dim in tensor.shape),
|
||||
"--save-csv-dir",
|
||||
str(reference_dir),
|
||||
]
|
||||
subprocess.run(runner_cmd, cwd=runner_build_dir, check=True)
|
||||
|
||||
write_inputs_to_memory_bin(pim_dir / "memory.bin", pim_dir / "config.json", [tensor])
|
||||
dump_ranges = build_dump_ranges(pim_dir / "config.json", output_descriptors)
|
||||
output_bin_path = simulation_dir / "out.bin"
|
||||
run_pim_simulator(
|
||||
args.simulator_dir,
|
||||
pim_dir,
|
||||
output_bin_path,
|
||||
dump_ranges,
|
||||
timeout_sec=args.command_timeout_seconds,
|
||||
)
|
||||
|
||||
output_index, output_name, output_dtype_code, output_shape = output_descriptors[0]
|
||||
output_dtype = np.dtype(_ONNX_TO_NP[output_dtype_code])
|
||||
reference_csv = reference_dir / f"output{output_index}_{sanitize_output_name(output_name)}.csv"
|
||||
reference_output = np.loadtxt(reference_csv, delimiter=",", dtype=output_dtype).reshape(output_shape)
|
||||
simulator_output = parse_pim_simulator_outputs(output_bin_path, output_descriptors)[0]
|
||||
return reference_output, simulator_output
|
||||
|
||||
|
||||
def print_topk(title: str, results):
|
||||
print(title)
|
||||
for rank, result in enumerate(results, start=1):
|
||||
label_text = result["label"] if result["label"] else f'class {result["index"]}'
|
||||
print(f' {rank}. {label_text} ({result["probability"] * 100.0:.2f}%) [index={result["index"]}]')
|
||||
|
||||
|
||||
def main():
|
||||
defaults = resolve_default_paths()
|
||||
|
||||
parser = argparse.ArgumentParser(description="Run a VGG or ResNet ONNX model through the Raptor simulator and annotate the image with top classification results.")
|
||||
parser.add_argument("--model", type=Path, default=None)
|
||||
parser.add_argument("--network", choices=("resnet", "vgg"), default=None)
|
||||
parser.add_argument("--image", type=Path, required=True)
|
||||
parser.add_argument("--labels", type=Path, default=None)
|
||||
parser.add_argument("--output", type=Path, required=True)
|
||||
parser.add_argument("--raptor-path", type=Path, default=defaults["raptor_path"])
|
||||
parser.add_argument("--onnx-include-dir", type=Path, default=defaults["onnx_include_dir"])
|
||||
parser.add_argument("--simulator-dir", type=Path, default=defaults["simulator_dir"])
|
||||
parser.add_argument("--crossbar-size", type=int, default=2048)
|
||||
parser.add_argument("--crossbar-count", type=int, default=256)
|
||||
parser.add_argument("--core-count", type=int, default=1000)
|
||||
parser.add_argument("--top-k", type=int, default=5)
|
||||
parser.add_argument("--command-timeout-seconds", type=float, default=7200.0)
|
||||
parser.add_argument("--skip-compile", action="store_true")
|
||||
parser.add_argument("--verbose", action="store_true")
|
||||
args = parser.parse_args()
|
||||
|
||||
args.model = resolve_model_path(args.network, args.model)
|
||||
args.image = args.image.resolve()
|
||||
args.output = args.output.resolve()
|
||||
args.labels = args.labels.resolve() if args.labels else None
|
||||
args.raptor_path = args.raptor_path.resolve()
|
||||
args.onnx_include_dir = args.onnx_include_dir.resolve()
|
||||
args.simulator_dir = args.simulator_dir.resolve()
|
||||
|
||||
if not args.skip_compile:
|
||||
ensure_local_artifacts(args, args.model)
|
||||
else:
|
||||
ensure_existing_artifacts(args.model.parent)
|
||||
|
||||
original_image, tensor = preprocess_classification_image(args.image)
|
||||
labels = load_labels(args.labels)
|
||||
reference_output, simulator_output = run_reference_and_simulator(args, args.model, tensor)
|
||||
reference_results = decode_classification_output(reference_output, labels, args.top_k)
|
||||
simulator_results = decode_classification_output(simulator_output, labels, args.top_k)
|
||||
|
||||
print_topk("Reference top-k:", reference_results)
|
||||
print_topk("Simulator top-k:", simulator_results)
|
||||
|
||||
reference_scores = np.asarray(reference_output, dtype=np.float64).reshape(-1)
|
||||
simulator_scores = np.asarray(simulator_output, dtype=np.float64).reshape(-1)
|
||||
max_abs_diff = float(np.max(np.abs(reference_scores - simulator_scores)))
|
||||
print(f"Max absolute score diff: {max_abs_diff:.6e}")
|
||||
|
||||
args.output.parent.mkdir(parents=True, exist_ok=True)
|
||||
draw_classification_panel(original_image, simulator_results, args.output)
|
||||
print(f"Annotated image saved to {args.output}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user