426 lines
16 KiB
Python
426 lines
16 KiB
Python
#!/usr/bin/env python3
|
|
|
|
import argparse
|
|
import json
|
|
import shlex
|
|
import subprocess
|
|
import tempfile
|
|
import urllib.request
|
|
from dataclasses import dataclass
|
|
from pathlib import Path
|
|
|
|
import numpy as np
|
|
from PIL import Image, ImageDraw
|
|
|
|
|
|
COCO80_CLASSES = [
|
|
"person", "bicycle", "car", "motorcycle", "airplane", "bus", "train", "truck", "boat", "traffic light",
|
|
"fire hydrant", "stop sign", "parking meter", "bench", "bird", "cat", "dog", "horse", "sheep", "cow",
|
|
"elephant", "bear", "zebra", "giraffe", "backpack", "umbrella", "handbag", "tie", "suitcase", "frisbee",
|
|
"skis", "snowboard", "sports ball", "kite", "baseball bat", "baseball glove", "skateboard", "surfboard",
|
|
"tennis racket", "bottle", "wine glass", "cup", "fork", "knife", "spoon", "bowl", "banana", "apple",
|
|
"sandwich", "orange", "broccoli", "carrot", "hot dog", "pizza", "donut", "cake", "chair", "couch",
|
|
"potted plant", "bed", "dining table", "toilet", "tv", "laptop", "mouse", "remote", "keyboard",
|
|
"cell phone", "microwave", "oven", "toaster", "sink", "refrigerator", "book", "clock", "vase",
|
|
"scissors", "teddy bear", "hair drier", "toothbrush",
|
|
]
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class ImageCase:
|
|
name: str
|
|
url: str
|
|
expected_label: str
|
|
|
|
|
|
IMAGE_CASES = [
|
|
ImageCase(
|
|
name="cat_coco_39769",
|
|
url="http://images.cocodataset.org/val2017/000000039769.jpg",
|
|
expected_label="cat",
|
|
),
|
|
ImageCase(
|
|
name="dog_pytorch_hub",
|
|
url="https://github.com/pytorch/hub/raw/master/images/dog.jpg",
|
|
expected_label="dog",
|
|
),
|
|
ImageCase(
|
|
name="cute_kitty",
|
|
url="https://images.unsplash.com/photo-1529778873920-4da4926a72c2?q=80&w=872&auto=format&fit=crop&ixlib=rb-4.1.0&ixid=M3wxMjA3fDB8MHxwaG90by1wYWdlfHx8fGVufDB8fHx8fA%3D%3D" ,
|
|
expected_label="cat",
|
|
),
|
|
|
|
]
|
|
|
|
|
|
def run(cmd, *, cwd=None, capture_output=False, input_bytes=None):
|
|
return subprocess.run(
|
|
cmd,
|
|
cwd=cwd,
|
|
check=True,
|
|
input=input_bytes,
|
|
capture_output=capture_output,
|
|
)
|
|
|
|
|
|
def ssh_command(ssh_key: str, remote_host: str, command: str):
|
|
return ["ssh", "-i", ssh_key, remote_host, command]
|
|
|
|
|
|
def remote_bash(ssh_key: str, remote_host: str, command: str, *, capture_output=False, input_bytes=None):
|
|
return run(
|
|
ssh_command(ssh_key, remote_host, command),
|
|
capture_output=capture_output,
|
|
input_bytes=input_bytes,
|
|
)
|
|
|
|
|
|
def download_image(url: str, path: Path):
|
|
with urllib.request.urlopen(url) as response:
|
|
path.write_bytes(response.read())
|
|
|
|
|
|
def letterbox_rgb(image: Image.Image, size: int = 640) -> np.ndarray:
|
|
image = image.convert("RGB")
|
|
width, height = image.size
|
|
scale = min(size / width, size / height)
|
|
resized_width = max(1, int(round(width * scale)))
|
|
resized_height = max(1, int(round(height * scale)))
|
|
resized = image.resize((resized_width, resized_height), Image.Resampling.BILINEAR)
|
|
|
|
canvas = Image.new("RGB", (size, size), (114, 114, 114))
|
|
offset_x = (size - resized_width) // 2
|
|
offset_y = (size - resized_height) // 2
|
|
canvas.paste(resized, (offset_x, offset_y))
|
|
|
|
array = np.asarray(canvas, dtype=np.float32) / 255.0
|
|
chw = np.transpose(array, (2, 0, 1))
|
|
return np.expand_dims(chw, axis=0)
|
|
|
|
|
|
def letterbox_params(width: int, height: int, size: int = 640):
|
|
scale = min(size / width, size / height)
|
|
resized_width = max(1, int(round(width * scale)))
|
|
resized_height = max(1, int(round(height * scale)))
|
|
offset_x = (size - resized_width) // 2
|
|
offset_y = (size - resized_height) // 2
|
|
return scale, offset_x, offset_y
|
|
|
|
|
|
def save_tensor_csv(array: np.ndarray, path: Path):
|
|
flat = array.reshape(-1)
|
|
np.savetxt(path, flat[np.newaxis, :], delimiter=",", fmt="%.9g")
|
|
|
|
|
|
def iou_xyxy(box: np.ndarray, boxes: np.ndarray) -> np.ndarray:
|
|
x1 = np.maximum(box[0], boxes[:, 0])
|
|
y1 = np.maximum(box[1], boxes[:, 1])
|
|
x2 = np.minimum(box[2], boxes[:, 2])
|
|
y2 = np.minimum(box[3], boxes[:, 3])
|
|
|
|
inter_w = np.maximum(0.0, x2 - x1)
|
|
inter_h = np.maximum(0.0, y2 - y1)
|
|
inter = inter_w * inter_h
|
|
|
|
area_box = np.maximum(0.0, box[2] - box[0]) * np.maximum(0.0, box[3] - box[1])
|
|
area_boxes = np.maximum(0.0, boxes[:, 2] - boxes[:, 0]) * np.maximum(0.0, boxes[:, 3] - boxes[:, 1])
|
|
union = area_box + area_boxes - inter
|
|
return np.divide(inter, union, out=np.zeros_like(inter), where=union > 0)
|
|
|
|
|
|
def decode_yolo_output(
|
|
output: np.ndarray,
|
|
*,
|
|
conf_threshold: float = 0.25,
|
|
iou_threshold: float = 0.45,
|
|
max_detections: int = 50,
|
|
):
|
|
predictions = output[0].T
|
|
boxes_xywh = predictions[:, :4]
|
|
class_scores = predictions[:, 4:]
|
|
|
|
class_ids = np.argmax(class_scores, axis=1)
|
|
confidences = class_scores[np.arange(class_scores.shape[0]), class_ids]
|
|
keep = confidences >= conf_threshold
|
|
|
|
if not np.any(keep):
|
|
return []
|
|
|
|
boxes_xywh = boxes_xywh[keep]
|
|
class_ids = class_ids[keep]
|
|
confidences = confidences[keep]
|
|
|
|
boxes_xyxy = np.empty_like(boxes_xywh)
|
|
boxes_xyxy[:, 0] = boxes_xywh[:, 0] - boxes_xywh[:, 2] / 2.0
|
|
boxes_xyxy[:, 1] = boxes_xywh[:, 1] - boxes_xywh[:, 3] / 2.0
|
|
boxes_xyxy[:, 2] = boxes_xywh[:, 0] + boxes_xywh[:, 2] / 2.0
|
|
boxes_xyxy[:, 3] = boxes_xywh[:, 1] + boxes_xywh[:, 3] / 2.0
|
|
|
|
detections = []
|
|
for class_id in np.unique(class_ids):
|
|
class_mask = class_ids == class_id
|
|
class_boxes = boxes_xyxy[class_mask]
|
|
class_scores_masked = confidences[class_mask]
|
|
order = np.argsort(-class_scores_masked)
|
|
|
|
while order.size > 0:
|
|
best = order[0]
|
|
detections.append({
|
|
"label": COCO80_CLASSES[int(class_id)],
|
|
"class_id": int(class_id),
|
|
"confidence": float(class_scores_masked[best]),
|
|
"box_xyxy": class_boxes[best].tolist(),
|
|
})
|
|
|
|
if order.size == 1:
|
|
break
|
|
|
|
rest = order[1:]
|
|
overlaps = iou_xyxy(class_boxes[best], class_boxes[rest])
|
|
order = rest[overlaps <= iou_threshold]
|
|
|
|
detections.sort(key=lambda det: det["confidence"], reverse=True)
|
|
return detections[:max_detections]
|
|
|
|
|
|
def top_unique_labels(detections, limit: int = 5):
|
|
labels = []
|
|
seen = set()
|
|
for det in detections:
|
|
label = det["label"]
|
|
if label in seen:
|
|
continue
|
|
seen.add(label)
|
|
labels.append(label)
|
|
if len(labels) == limit:
|
|
break
|
|
return labels
|
|
|
|
|
|
def clamp_box_xyxy(box_xyxy, width: int, height: int):
|
|
x1, y1, x2, y2 = box_xyxy
|
|
return [
|
|
max(0.0, min(float(width - 1), float(x1))),
|
|
max(0.0, min(float(height - 1), float(y1))),
|
|
max(0.0, min(float(width - 1), float(x2))),
|
|
max(0.0, min(float(height - 1), float(y2))),
|
|
]
|
|
|
|
|
|
def unletterbox_box_xyxy(box_xyxy, width: int, height: int, size: int = 640):
|
|
scale, offset_x, offset_y = letterbox_params(width, height, size=size)
|
|
x1, y1, x2, y2 = box_xyxy
|
|
return [
|
|
(float(x1) - offset_x) / scale,
|
|
(float(y1) - offset_y) / scale,
|
|
(float(x2) - offset_x) / scale,
|
|
(float(y2) - offset_y) / scale,
|
|
]
|
|
|
|
|
|
def draw_detections(image_path: Path, detections, output_path: Path, *, limit: int = 10):
|
|
image = Image.open(image_path).convert("RGB")
|
|
draw = ImageDraw.Draw(image)
|
|
width, height = image.size
|
|
|
|
for det in detections[:limit]:
|
|
box = unletterbox_box_xyxy(det["box_xyxy"], width, height)
|
|
box = clamp_box_xyxy(box, width, height)
|
|
label = f'{det["label"]} {det["confidence"]:.2f}'
|
|
draw.rectangle(box, outline=(255, 0, 0), width=3)
|
|
text_box = draw.textbbox((box[0], box[1]), label)
|
|
text_bg = [
|
|
text_box[0] - 2,
|
|
text_box[1] - 2,
|
|
text_box[2] + 2,
|
|
text_box[3] + 2,
|
|
]
|
|
draw.rectangle(text_bg, fill=(255, 0, 0))
|
|
draw.text((box[0], box[1]), label, fill=(255, 255, 255))
|
|
|
|
image.save(output_path)
|
|
|
|
|
|
def ensure_remote_artifacts(args):
|
|
remote_project = shlex.quote(args.remote_project)
|
|
remote_python = shlex.quote(args.remote_python)
|
|
validate_cmd = (
|
|
f"export PATH=$HOME/.cargo/bin:$PATH && "
|
|
f"cd {remote_project} && "
|
|
f"{remote_python} validation/validate.py "
|
|
f"--raptor-path build_release/Release/bin/onnx-mlir "
|
|
f"--onnx-include-dir onnx-mlir/include "
|
|
f"--operations-dir {shlex.quote(args.network_dir)} "
|
|
f"--crossbar-size {args.crossbar_size} "
|
|
f"--crossbar-count {args.crossbar_count} "
|
|
f"--core-count {args.core_count} "
|
|
f"--command-timeout-seconds {args.command_timeout_seconds} "
|
|
f"--compile-only"
|
|
)
|
|
remote_bash(args.ssh_key, args.remote_host, validate_cmd)
|
|
|
|
|
|
def remote_case_paths(args, case_name: str):
|
|
network_dir = Path(args.network_dir)
|
|
root = Path(args.remote_project) / network_dir
|
|
return {
|
|
"root": root,
|
|
"runner": root / "runner" / "build" / "runner",
|
|
"runner_build": root / "runner" / "build",
|
|
"raptor_pim": root / "raptor" / "pim",
|
|
"real_root": root / "real_image_validation",
|
|
"input_csv": root / "real_image_validation" / "inputs" / f"{case_name}.csv",
|
|
"ref_dir": root / "real_image_validation" / "reference" / case_name,
|
|
"sim_dir": root / "real_image_validation" / "simulation" / case_name,
|
|
"sim_bin": root / "real_image_validation" / "simulation" / case_name / "out.bin",
|
|
}
|
|
|
|
|
|
def write_remote_file(args, remote_path: Path, data: bytes):
|
|
command = (
|
|
f"mkdir -p {shlex.quote(str(remote_path.parent))} && "
|
|
f"cat > {shlex.quote(str(remote_path))}"
|
|
)
|
|
remote_bash(args.ssh_key, args.remote_host, command, input_bytes=data)
|
|
|
|
|
|
def run_remote_reference_and_simulator(args, case_name: str):
|
|
paths = remote_case_paths(args, case_name)
|
|
quoted_project = shlex.quote(args.remote_project)
|
|
quoted_python = shlex.quote(args.remote_python)
|
|
quoted_case_csv = shlex.quote(str(paths["input_csv"]))
|
|
quoted_ref_dir = shlex.quote(str(paths["ref_dir"]))
|
|
quoted_sim_dir = shlex.quote(str(paths["sim_dir"]))
|
|
quoted_sim_bin = shlex.quote(str(paths["sim_bin"]))
|
|
quoted_runner = shlex.quote(str(paths["runner"]))
|
|
quoted_runner_build = shlex.quote(str(paths["runner_build"]))
|
|
quoted_pim = shlex.quote(str(paths["raptor_pim"]))
|
|
|
|
command = f"""
|
|
set -e
|
|
export PATH=$HOME/.cargo/bin:$PATH
|
|
cd {quoted_project}
|
|
mkdir -p {quoted_ref_dir} {quoted_sim_dir}
|
|
cd {quoted_runner_build}
|
|
{quoted_runner} --in0-csv-file {quoted_case_csv} --in0-shape 1x3x640x640 --save-csv-dir {quoted_ref_dir}
|
|
cd {quoted_project}
|
|
{quoted_python} - <<'PY'
|
|
import json
|
|
import numpy as np
|
|
from pathlib import Path
|
|
input_csv = Path({json.dumps(str(paths["input_csv"]))})
|
|
pim_dir = Path({json.dumps(str(paths["raptor_pim"]))})
|
|
config = json.loads((pim_dir / "config.json").read_text())
|
|
tensor = np.loadtxt(input_csv, delimiter=",", dtype=np.float32).reshape(1, 3, 640, 640)
|
|
with open(pim_dir / "memory.bin", "r+b") as f:
|
|
f.seek(config["inputs_addresses"][0])
|
|
f.write(tensor.tobytes(order="C"))
|
|
output_addr = config["outputs_addresses"][0]
|
|
output_size = 1 * 84 * 8400 * 4
|
|
print(f"{{output_addr}},{{output_size}}")
|
|
PY
|
|
"""
|
|
result = remote_bash(args.ssh_key, args.remote_host, command, capture_output=True)
|
|
dump_range = result.stdout.decode().strip().splitlines()[-1]
|
|
|
|
sim_command = (
|
|
f"export PATH=$HOME/.cargo/bin:$PATH && "
|
|
f"cd {quoted_project}/backend-simulators/pim/pim-simulator && "
|
|
f"cargo run --no-default-features --release --package pim-simulator --bin pim-simulator -- "
|
|
f"-f {quoted_pim} -o {quoted_sim_bin} -d {dump_range}"
|
|
)
|
|
remote_bash(args.ssh_key, args.remote_host, sim_command)
|
|
return paths
|
|
|
|
|
|
def read_remote_file(args, remote_path: Path) -> bytes:
|
|
result = remote_bash(
|
|
args.ssh_key,
|
|
args.remote_host,
|
|
f"cat {shlex.quote(str(remote_path))}",
|
|
capture_output=True,
|
|
)
|
|
return result.stdout
|
|
|
|
|
|
def analyze_case(args, case: ImageCase, work_dir: Path):
|
|
image_path = work_dir / f"{case.name}{Path(case.url).suffix or '.img'}"
|
|
csv_path = work_dir / f"{case.name}.csv"
|
|
annotated_dir = Path(args.annotated_dir)
|
|
annotated_dir.mkdir(parents=True, exist_ok=True)
|
|
download_image(case.url, image_path)
|
|
tensor = letterbox_rgb(Image.open(image_path))
|
|
save_tensor_csv(tensor, csv_path)
|
|
|
|
remote_paths = remote_case_paths(args, case.name)
|
|
write_remote_file(args, remote_paths["input_csv"], csv_path.read_bytes())
|
|
remote_paths = run_remote_reference_and_simulator(args, case.name)
|
|
|
|
ref_csv = read_remote_file(args, remote_paths["ref_dir"] / "output0_output0.csv")
|
|
sim_bin = read_remote_file(args, remote_paths["sim_bin"])
|
|
|
|
ref = np.loadtxt(ref_csv.decode().splitlines(), delimiter=",", dtype=np.float32).reshape(1, 84, 8400)
|
|
sim = np.frombuffer(sim_bin, dtype=np.float32, count=1 * 84 * 8400).reshape(1, 84, 8400)
|
|
abs_diff = np.abs(sim.astype(np.float64) - ref.astype(np.float64))
|
|
rel_diff = abs_diff / np.maximum(np.abs(ref.astype(np.float64)), 1e-12)
|
|
|
|
ref_detections = decode_yolo_output(ref)
|
|
sim_detections = decode_yolo_output(sim)
|
|
ref_labels = top_unique_labels(ref_detections)
|
|
sim_labels = top_unique_labels(sim_detections)
|
|
ref_image_path = annotated_dir / f"{case.name}_reference.png"
|
|
sim_image_path = annotated_dir / f"{case.name}_simulator.png"
|
|
draw_detections(image_path, ref_detections, ref_image_path)
|
|
draw_detections(image_path, sim_detections, sim_image_path)
|
|
|
|
return {
|
|
"case": case.name,
|
|
"expected_label": case.expected_label,
|
|
"ref_top_labels": ref_labels,
|
|
"sim_top_labels": sim_labels,
|
|
"top1_match": bool(ref_labels and sim_labels and ref_labels[0] == sim_labels[0]),
|
|
"expected_in_ref": case.expected_label in ref_labels,
|
|
"expected_in_sim": case.expected_label in sim_labels,
|
|
"max_abs_diff": float(abs_diff.max()),
|
|
"mean_abs_diff": float(abs_diff.mean()),
|
|
"max_rel_diff": float(rel_diff.max()),
|
|
"mean_rel_diff": float(rel_diff.mean()),
|
|
"reference_annotated_image": str(ref_image_path),
|
|
"simulator_annotated_image": str(sim_image_path),
|
|
"ref_top_detections": ref_detections[:5],
|
|
"sim_top_detections": sim_detections[:5],
|
|
}
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description="Validate YOLO detections on real animal images against the simulator.")
|
|
parser.add_argument("--remote-host", default="gmagnani@monolith")
|
|
parser.add_argument("--ssh-key", default="~/.ssh/github")
|
|
parser.add_argument("--remote-project", default="/home/gmagnani/Project/Raptor")
|
|
parser.add_argument("--remote-python", default="/home/gmagnani/venv/bin/python")
|
|
parser.add_argument("--network-dir", default="validation/networks/yolo11n/depth_51")
|
|
parser.add_argument("--crossbar-size", type=int, default=2048)
|
|
parser.add_argument("--crossbar-count", type=int, default=256)
|
|
parser.add_argument("--core-count", type=int, default=1000)
|
|
parser.add_argument("--command-timeout-seconds", type=int, default=7200)
|
|
parser.add_argument("--skip-compile", action="store_true")
|
|
parser.add_argument("--annotated-dir", default="validation/networks/yolo11n/depth_51/real_image_validation/annotated")
|
|
args = parser.parse_args()
|
|
|
|
args.ssh_key = str(Path(args.ssh_key).expanduser())
|
|
|
|
if not args.skip_compile:
|
|
ensure_remote_artifacts(args)
|
|
|
|
reports = []
|
|
with tempfile.TemporaryDirectory(prefix="yolo_real_images_") as tmp_dir:
|
|
work_dir = Path(tmp_dir)
|
|
for case in IMAGE_CASES:
|
|
reports.append(analyze_case(args, case, work_dir))
|
|
|
|
print(json.dumps({"network_dir": args.network_dir, "cases": reports}, indent=2))
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|