CodexWorkaround
Validate Operations / validate-operations (push) Has been cancelled

This commit is contained in:
ilgeco
2026-06-08 11:33:36 +02:00
parent aec80529ca
commit 75fb70712f
184 changed files with 1127 additions and 7 deletions
@@ -0,0 +1,144 @@
#!/usr/bin/env python3
"""Check whether a PIM bufferized IR dump matches the YOLO depth_35 tail signature."""
from __future__ import annotations
import argparse
import re
from pathlib import Path
NONZERO_BOX_SUBVIEW_RE = re.compile(
r"memref\.subview .*?: .* to "
r"memref<1x2x8400xf32, strided<\[[0-9]+, 8400, 1\], offset: 16800>>"
)
FINAL_CONCAT_RE = re.compile(
r"pim\.concat axis 1 .*?: "
r"\(memref<1x4x8400xf32(?:, strided<[^>]+>)?>, "
r"memref<1x80x8400xf32(?:, strided<[^>]+>)?>\) -> memref<1x84x8400xf32>"
)
FINAL_OUTPUT_RE = re.compile(r"memref<1x84x8400xf32>")
def resolve_pim1_buff(path_str: str) -> Path:
path = Path(path_str)
if path.is_dir():
candidate = path / "pim1_buff.mlir"
if candidate.is_file():
return candidate
candidate = path / "dialects" / "pim1_buff.mlir"
if candidate.is_file():
return candidate
if path.is_file():
return path
raise FileNotFoundError(f"could not find pim1_buff.mlir under {path}")
def find_line_offsets(text: str) -> list[int]:
offsets = [0]
for match in re.finditer(r"\n", text):
offsets.append(match.end())
return offsets
def line_number(offsets: list[int], position: int) -> int:
lo = 0
hi = len(offsets)
while lo + 1 < hi:
mid = (lo + hi) // 2
if offsets[mid] <= position:
lo = mid
else:
hi = mid
return lo + 1
def extract_block(lines: list[str], center_line: int, radius: int = 1) -> str:
start = max(center_line - radius - 1, 0)
end = min(center_line + radius, len(lines))
return "\n".join(lines[start:end])
def check_signature(path: Path) -> dict[str, object]:
text = path.read_text()
lines = text.splitlines()
offsets = find_line_offsets(text)
subview_match = NONZERO_BOX_SUBVIEW_RE.search(text)
subview_var = None
subview_line = None
subview_snippet = None
if subview_match:
line_no = line_number(offsets, subview_match.start())
subview_line = line_no
subview_snippet = extract_block(lines, line_no, radius=0)
line_text = lines[line_no - 1]
lhs = line_text.split("=", 1)[0].strip()
subview_var = lhs
direct_feed_matches: list[tuple[str, int, str]] = []
if subview_var:
for idx, line in enumerate(lines, start=1):
if subview_var not in line:
continue
if not re.search(r"pim\.vv(add|sub|mul)|pim\.concat", line):
continue
direct_feed_matches.append((line.strip(), idx, extract_block(lines, idx, radius=0)))
subview_used_as_dest = any(re.search(rf"pim\.vv(add|sub|mul)\([^)]*, [^)]*, {re.escape(subview_var)}\)", line)
for line, _, _ in direct_feed_matches) if subview_var else False
final_concat_match = FINAL_CONCAT_RE.search(text)
final_output_shape = bool(FINAL_OUTPUT_RE.search(text))
return {
"path": path,
"has_nonzero_box_subview": bool(subview_match),
"nonzero_box_subview_line": subview_line,
"nonzero_box_subview_snippet": subview_snippet,
"subview_var": subview_var,
"direct_pim_uses": direct_feed_matches,
"subview_used_as_dest": subview_used_as_dest,
"has_final_output_concat": bool(final_concat_match),
"has_final_output_shape": final_output_shape,
}
def print_report(result: dict[str, object]) -> None:
path = result["path"]
print(f"== {path} ==")
print(f"nonzero_box_subview: {result['has_nonzero_box_subview']}")
if result["nonzero_box_subview_snippet"]:
print(result["nonzero_box_subview_snippet"])
direct_uses = result["direct_pim_uses"]
print(f"direct_pim_use_count: {len(direct_uses)}")
for line, line_no, snippet in direct_uses:
print(f"line {line_no}: {line}")
print(snippet)
print(f"subview_used_as_destination: {result['subview_used_as_dest']}")
print(f"final_output_concat_4_80_to_84: {result['has_final_output_concat']}")
print(f"contains_output_shape_1x84x8400: {result['has_final_output_shape']}")
structurally_equivalent = (
result["has_nonzero_box_subview"]
and bool(direct_uses)
and result["has_final_output_concat"]
and result["has_final_output_shape"]
)
print(f"structurally_equivalent_to_yolo_tail: {structurally_equivalent}")
print()
def main() -> int:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("paths", nargs="+", help="Workspace, dialect dir, or pim1_buff.mlir to inspect")
args = parser.parse_args()
for path_str in args.paths:
result = check_signature(resolve_pim1_buff(path_str))
print_report(result)
return 0
if __name__ == "__main__":
raise SystemExit(main())
@@ -0,0 +1,206 @@
#!/usr/bin/env python3
"""Generate output-only ONNX variants for the real yolo11n depth_35 model."""
from __future__ import annotations
import argparse
from pathlib import Path
import onnx
from onnx import TensorProto, helper, shape_inference
ORIGINAL_DEBUG_OUTPUTS = [
"/model.20/act/Mul_output_0",
"/model.9/cv1/act/Mul_output_0",
"/model.8/m.0/m/m.0/cv2/conv/Conv_output_0",
"/model.23/cv3.1/cv3.1.0/cv3.1.0.0/act/Mul_output_0",
"/model.8/m.0/m/m.1/cv1/act/Mul_output_0",
"/model.23/dfl/Transpose_output_0",
"output0",
"/model.23/cv2.1/cv2.1.0/act/Mul_output_0",
]
LOCALIZATION_NODE_NAMES = [
"/model.23/dfl/Transpose",
"/model.23/dfl/Softmax",
"/model.23/dfl/conv/Conv",
"/model.23/dfl/Reshape_1",
"/model.23/Slice",
"/model.23/Slice_1",
"/model.23/Sub",
"/model.23/Add_1",
"/model.23/Sub_1",
"/model.23/Concat_4",
"/model.23/Mul_2",
"/model.23/Sigmoid",
"/model.23/Concat_5",
]
def collect_value_infos(model: onnx.ModelProto) -> dict[str, onnx.ValueInfoProto]:
infos: dict[str, onnx.ValueInfoProto] = {}
for value in list(model.graph.input) + list(model.graph.output) + list(model.graph.value_info):
infos[value.name] = value
return infos
def clone_value_info(value: onnx.ValueInfoProto, name: str | None = None) -> onnx.ValueInfoProto:
cloned = onnx.ValueInfoProto()
cloned.CopyFrom(value)
if name is not None:
cloned.name = name
return cloned
def make_tensor_value_info_from_type(name: str, tensor_type: onnx.TypeProto.Tensor) -> onnx.ValueInfoProto:
shape = []
for dim in tensor_type.shape.dim:
if dim.HasField("dim_value"):
shape.append(dim.dim_value)
elif dim.HasField("dim_param"):
shape.append(dim.dim_param)
else:
shape.append(None)
return helper.make_tensor_value_info(name, tensor_type.elem_type, shape)
def lookup_output_value_info(model: onnx.ModelProto, value_infos: dict[str, onnx.ValueInfoProto], output_name: str) -> onnx.ValueInfoProto:
value = value_infos.get(output_name)
if value is not None:
return clone_value_info(value)
for initializer in model.graph.initializer:
if initializer.name != output_name:
continue
dims = list(initializer.dims)
return helper.make_tensor_value_info(output_name, initializer.data_type, dims)
raise KeyError(f"missing value info for output {output_name}")
def build_model_with_outputs(
base_model: onnx.ModelProto,
inferred_model: onnx.ModelProto,
output_names: list[str],
extra_nodes: list[onnx.NodeProto] | None = None,
extra_value_infos: list[onnx.ValueInfoProto] | None = None,
) -> onnx.ModelProto:
value_infos = collect_value_infos(inferred_model)
if extra_value_infos:
for value in extra_value_infos:
value_infos[value.name] = value
model = onnx.ModelProto()
model.CopyFrom(base_model)
del model.graph.output[:]
for output_name in output_names:
model.graph.output.append(lookup_output_value_info(inferred_model, value_infos, output_name))
if extra_nodes:
model.graph.node.extend(extra_nodes)
if extra_value_infos:
model.graph.value_info.extend(extra_value_infos)
onnx.checker.check_model(model)
return model
def find_node_output(model: onnx.ModelProto, node_name: str) -> str:
for node in model.graph.node:
if node.name == node_name:
return node.output[0]
matching_names = sorted(node.name for node in model.graph.node if "model.23" in node.name and "dfl" in node.name)
suffix = ""
if matching_names:
suffix = "\nmatching /model.23 dfl nodes:\n " + "\n ".join(matching_names)
raise KeyError(f"could not find node named {node_name}{suffix}")
def save_variant(model: onnx.ModelProto, out_dir: Path, variant_name: str) -> None:
variant_dir = out_dir / variant_name
variant_dir.mkdir(parents=True, exist_ok=True)
onnx.save(model, variant_dir / f"{variant_name}.onnx")
def unique_preserving_order(names: list[str]) -> list[str]:
seen: set[str] = set()
unique_names: list[str] = []
for name in names:
if name in seen:
continue
seen.add(name)
unique_names.append(name)
return unique_names
def main() -> int:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--input", required=True, help="Path to yolo11n depth_35 ONNX model.")
parser.add_argument("--out-dir", required=True, help="Directory where variants will be generated.")
args = parser.parse_args()
input_path = Path(args.input).resolve()
out_dir = Path(args.out_dir).resolve()
out_dir.mkdir(parents=True, exist_ok=True)
base_model = onnx.load(input_path)
inferred_model = shape_inference.infer_shapes(base_model)
output0_only = build_model_with_outputs(base_model, inferred_model, ["output0"])
save_variant(output0_only, out_dir, "output0_only")
output0_first = build_model_with_outputs(
base_model,
inferred_model,
["output0"] + [name for name in ORIGINAL_DEBUG_OUTPUTS if name != "output0"],
)
save_variant(output0_first, out_dir, "output0_first_with_original_debug_outputs")
output0_last = build_model_with_outputs(
base_model,
inferred_model,
[name for name in ORIGINAL_DEBUG_OUTPUTS if name != "output0"] + ["output0"],
)
save_variant(output0_last, out_dir, "output0_last_with_original_debug_outputs")
identity_name = "output0_identity"
identity_node = helper.make_node("Identity", ["output0"], [identity_name], name="output0_identity_node")
output0_value = lookup_output_value_info(inferred_model, collect_value_infos(inferred_model), "output0")
duplicated = build_model_with_outputs(
base_model,
inferred_model,
["output0", identity_name],
extra_nodes=[identity_node],
extra_value_infos=[make_tensor_value_info_from_type(identity_name, output0_value.type.tensor_type)],
)
save_variant(duplicated, out_dir, "output0_duplicated")
localization_outputs = [
"/model.23/dfl/Transpose_output_0",
find_node_output(base_model, "/model.23/dfl/Softmax"),
find_node_output(base_model, "/model.23/dfl/conv/Conv"),
find_node_output(base_model, "/model.23/dfl/Reshape_1"),
find_node_output(base_model, "/model.23/Slice"),
find_node_output(base_model, "/model.23/Slice_1"),
find_node_output(base_model, "/model.23/Sub"),
find_node_output(base_model, "/model.23/Add_1"),
find_node_output(base_model, "/model.23/Sub_1"),
find_node_output(base_model, "/model.23/Concat_4"),
find_node_output(base_model, "/model.23/Mul_2"),
find_node_output(base_model, "/model.23/Sigmoid"),
find_node_output(base_model, "/model.23/Concat_5"),
"output0",
]
localization = build_model_with_outputs(
base_model,
inferred_model,
unique_preserving_order(localization_outputs),
)
save_variant(localization, out_dir, "yolo_tail_localization_outputs")
return 0
if __name__ == "__main__":
raise SystemExit(main())