Merge branch 'main' of chef.heaplab.deib.polimi.it:nnicolosi/Raptor
Validate Operations / validate-operations (push) Has been cancelled
Validate Operations / validate-operations (push) Has been cancelled
This commit is contained in:
@@ -138,7 +138,7 @@ static Value padHVectorInputToCrossbarSize(IRRewriter& rewriter, Location loc, V
|
||||
}
|
||||
|
||||
void SpatialToPimPass::runOnOperation() {
|
||||
coreId = 1;
|
||||
coreId = 0;
|
||||
ModuleOp moduleOp = getOperation();
|
||||
MLIRContext* ctx = moduleOp.getContext();
|
||||
|
||||
|
||||
@@ -480,8 +480,8 @@ LogicalResult SpatComputeBatch::verify() {
|
||||
return emitError("compute_batch coreIds attribute must be a dense i32 array");
|
||||
if (coreIdsAttr.size() != static_cast<int64_t>(laneCountSz))
|
||||
return emitError("compute_batch coreIds array length must match laneCount");
|
||||
if (llvm::any_of(coreIdsAttr.asArrayRef(), [](int32_t coreId) { return coreId <= 0; }))
|
||||
return emitError("compute_batch coreIds values must be positive");
|
||||
if (llvm::any_of(coreIdsAttr.asArrayRef(), [](int32_t coreId) { return coreId < 0; }))
|
||||
return emitError("compute_batch coreIds values must be non-negative");
|
||||
llvm::SmallDenseSet<int32_t, 8> seenCoreIds;
|
||||
for (int32_t coreId : coreIdsAttr.asArrayRef())
|
||||
if (!seenCoreIds.insert(coreId).second)
|
||||
|
||||
@@ -1,5 +1,4 @@
|
||||
#include "mlir/Dialect/Arith/IR/Arith.h"
|
||||
#include "mlir/Dialect/Tensor/IR/Tensor.h"
|
||||
#include "mlir/IR/IRMapping.h"
|
||||
#include "mlir/IR/PatternMatch.h"
|
||||
|
||||
@@ -35,8 +34,6 @@ using spatial::getComputeInstanceTemplateBlock;
|
||||
using spatial::getComputeInstanceWeights;
|
||||
using spatial::getProducerValueRef;
|
||||
|
||||
static int32_t getPhysicalCoreId(size_t schedulerCpu) { return static_cast<int32_t>(schedulerCpu + 1); }
|
||||
|
||||
class MergeScheduleMaterializerImpl {
|
||||
public:
|
||||
explicit MergeScheduleMaterializerImpl(func::FuncOp funcOp)
|
||||
@@ -64,10 +61,8 @@ public:
|
||||
private:
|
||||
struct ScheduledTask {
|
||||
ComputeInstance computeInstance;
|
||||
Operation* sourceOp = nullptr;
|
||||
size_t cpu = 0;
|
||||
size_t order = 0;
|
||||
size_t executionOrder = 0;
|
||||
size_t orderWithinCpu = 0;
|
||||
};
|
||||
|
||||
struct ChannelInfo {
|
||||
@@ -78,7 +73,6 @@ private:
|
||||
|
||||
struct CpuProgram {
|
||||
SpatCompute op;
|
||||
Block* block = nullptr;
|
||||
DenseMap<Value, Value> externalInputMap;
|
||||
DenseMap<Value, size_t> weightToIndex;
|
||||
};
|
||||
@@ -103,43 +97,6 @@ private:
|
||||
| static_cast<uint32_t>(channelInfo.targetCoreId);
|
||||
}
|
||||
|
||||
static void appendUniqueValue(SmallVectorImpl<Value>& values, DenseSet<Value>& seen, Value value) {
|
||||
if (seen.insert(value).second)
|
||||
values.push_back(value);
|
||||
}
|
||||
|
||||
bool isOldComputeResult(Operation* op) {
|
||||
auto it = isInternalInputOpCache.find(op);
|
||||
if (it != isInternalInputOpCache.end())
|
||||
return it->second;
|
||||
|
||||
auto extract = dyn_cast_or_null<tensor::ExtractSliceOp>(op);
|
||||
if (!extract)
|
||||
return isInternalInputOpCache[op] = false;
|
||||
|
||||
for (Value result : extract->getResults()) {
|
||||
for (Operation* user : result.getUsers()) {
|
||||
if (oldComputeOps.contains(user))
|
||||
continue;
|
||||
if (isOldComputeResult(user))
|
||||
continue;
|
||||
return isInternalInputOpCache[op] = false;
|
||||
}
|
||||
}
|
||||
return isInternalInputOpCache[op] = true;
|
||||
}
|
||||
|
||||
void collectInternalInputOps(Value value) {
|
||||
Operation* op = value.getDefiningOp();
|
||||
//TODO ExtractSliceOp is not the only legal host op to traverse! dio
|
||||
while (auto extract = dyn_cast_if_present<tensor::ExtractSliceOp>(op)) {
|
||||
if (isOldComputeResult(extract.getOperation()))
|
||||
internalInputOpsToErase.insert(extract.getOperation());
|
||||
value = extract.getSource();
|
||||
op = value.getDefiningOp();
|
||||
}
|
||||
}
|
||||
|
||||
void collectExternalUsers(Operation* op) {
|
||||
if (!externalUsersToMove.insert(op).second)
|
||||
return;
|
||||
@@ -153,14 +110,11 @@ private:
|
||||
}
|
||||
|
||||
void collectScheduledTasks() {
|
||||
size_t nextOrder = 0;
|
||||
for (ComputeInstance scheduledInstance : schedule->dominanceOrderCompute) {
|
||||
oldComputeOps.insert(scheduledInstance.op);
|
||||
scheduledTasks.push_back({scheduledInstance,
|
||||
scheduledInstance.op,
|
||||
schedule->computeToCpuMap.lookup(scheduledInstance),
|
||||
schedule->computeToCpuSlotMap.lookup(scheduledInstance),
|
||||
nextOrder++});
|
||||
schedule->computeToCpuSlotMap.lookup(scheduledInstance)});
|
||||
}
|
||||
}
|
||||
|
||||
@@ -177,14 +131,10 @@ private:
|
||||
}
|
||||
|
||||
llvm::sort(orderedCpus);
|
||||
for (size_t cpu : orderedCpus) {
|
||||
llvm::stable_sort(tasksByCpu[cpu],
|
||||
[&](const ScheduledTask& lhs, const ScheduledTask& rhs) { return lhs.order < rhs.order; });
|
||||
for (auto [executionOrder, task] : llvm::enumerate(tasksByCpu[cpu])) {
|
||||
task.executionOrder = executionOrder;
|
||||
taskByComputeInstance[task.computeInstance].executionOrder = executionOrder;
|
||||
}
|
||||
}
|
||||
for (size_t cpu : orderedCpus)
|
||||
llvm::stable_sort(tasksByCpu[cpu], [&](const ScheduledTask& lhs, const ScheduledTask& rhs) {
|
||||
return lhs.orderWithinCpu < rhs.orderWithinCpu;
|
||||
});
|
||||
}
|
||||
|
||||
void collectExternalInputsAndWeights() {
|
||||
@@ -203,26 +153,26 @@ private:
|
||||
for (auto [inputIndex, input] : llvm::enumerate(taskInputs)) {
|
||||
auto producerRef = getProducerValueRef(input);
|
||||
if (producerRef) {
|
||||
collectInternalInputOps(input);
|
||||
auto producerIt = taskByComputeInstance.find(producerRef->instance);
|
||||
if (producerIt != taskByComputeInstance.end()) {
|
||||
if (producerIt->second.cpu != cpu) {
|
||||
ChannelInfo info {
|
||||
(*nextChannelId)++,
|
||||
getPhysicalCoreId(producerIt->second.cpu),
|
||||
getPhysicalCoreId(cpu),
|
||||
static_cast<int32_t>(producerIt->second.cpu),
|
||||
static_cast<int32_t>(cpu),
|
||||
};
|
||||
remoteInputs[inputIndex] = info;
|
||||
auto& perResultChannels = remoteSendsByTask[producerRef->instance];
|
||||
if (perResultChannels.empty())
|
||||
perResultChannels.resize(getComputeInstanceOutputTypes(producerIt->second.computeInstance).size());
|
||||
perResultChannels[producerRef->resultIndex].push_back(
|
||||
{info, task.computeInstance, inputIndex, task.executionOrder, 0});
|
||||
{info, task.computeInstance, inputIndex, task.orderWithinCpu, 0});
|
||||
}
|
||||
continue;
|
||||
}
|
||||
}
|
||||
appendUniqueValue(cpuExternalInputs[cpu], seenExternalInputsByCpu[cpu], input);
|
||||
if (seenExternalInputsByCpu[cpu].insert(input).second)
|
||||
cpuExternalInputs[cpu].push_back(input);
|
||||
}
|
||||
|
||||
auto taskOutputs = getComputeInstanceOutputValues(task.computeInstance);
|
||||
@@ -314,7 +264,7 @@ private:
|
||||
uint64_t pairKey = getRemoteSendPairKey(sendInfo.channelInfo);
|
||||
if (!pairsNeedingReceiveReorder.contains(pairKey))
|
||||
continue;
|
||||
size_t targetCpu = static_cast<size_t>(sendInfo.channelInfo.targetCoreId - 1);
|
||||
size_t targetCpu = static_cast<size_t>(sendInfo.channelInfo.targetCoreId);
|
||||
receiveQueuesByCpu[targetCpu][pairKey].push_back(
|
||||
{sendInfo.channelInfo, sendInfo.consumer, sendInfo.inputIndex, sendInfo.sourceOrder});
|
||||
}
|
||||
@@ -351,7 +301,7 @@ private:
|
||||
auto newCompute = SpatCompute::create(rewriter, loc, TypeRange(resultTypes), ValueRange(operands));
|
||||
newCompute.getProperties().setOperandSegmentSizes(
|
||||
{static_cast<int>(cpuWeights[cpu].size()), static_cast<int>(cpuExternalInputs[cpu].size())});
|
||||
newCompute->setAttr(onnx_mlir::kCoreIdAttrName, rewriter.getI32IntegerAttr(getPhysicalCoreId(cpu)));
|
||||
newCompute->setAttr(onnx_mlir::kCoreIdAttrName, rewriter.getI32IntegerAttr(static_cast<int32_t>(cpu)));
|
||||
|
||||
SmallVector<Type> blockArgTypes;
|
||||
SmallVector<Location> blockArgLocs;
|
||||
@@ -366,7 +316,6 @@ private:
|
||||
|
||||
CpuProgram program;
|
||||
program.op = newCompute;
|
||||
program.block = newBlock;
|
||||
for (auto [weightIndex, weight] : llvm::enumerate(cpuWeights[cpu]))
|
||||
program.weightToIndex[weight] = weightIndex;
|
||||
for (auto [inputIndex, input] : llvm::enumerate(cpuExternalInputs[cpu]))
|
||||
@@ -428,7 +377,7 @@ private:
|
||||
for (size_t cpu : orderedCpus) {
|
||||
CpuProgram& program = cpuPrograms[cpu];
|
||||
IRRewriter rewriter(func.getContext());
|
||||
rewriter.setInsertionPointToEnd(program.block);
|
||||
rewriter.setInsertionPointToEnd(&program.op.getBody().front());
|
||||
DenseMap<uint64_t, size_t> receiveQueueIndices;
|
||||
DenseMap<ComputeInstance, SmallVector<Value>> preReceivedInputsByTask;
|
||||
|
||||
@@ -458,7 +407,7 @@ private:
|
||||
if (producerIt->second.cpu == cpu) {
|
||||
auto producedIt = producedValuesByTask.find(producerRef->instance);
|
||||
if (producedIt == producedValuesByTask.end() || producedIt->second.size() <= producerRef->resultIndex) {
|
||||
task.sourceOp->emitOpError("missing local producer value during per-cpu merge materialization")
|
||||
task.computeInstance.op->emitOpError("missing local producer value during per-cpu merge materialization")
|
||||
<< " consumerCpu=" << cpu << " producerCpu=" << producerIt->second.cpu
|
||||
<< " producerLaneStart=" << producerRef->instance.laneStart
|
||||
<< " producerLaneCount=" << producerRef->instance.laneCount;
|
||||
@@ -482,7 +431,7 @@ private:
|
||||
task.computeInstance,
|
||||
inputIndex);
|
||||
if (failed(received)) {
|
||||
task.sourceOp->emitOpError("failed to materialize reordered remote receive")
|
||||
task.computeInstance.op->emitOpError("failed to materialize reordered remote receive")
|
||||
<< " consumerCpu=" << cpu << " sourceCoreId=" << channelInfo.sourceCoreId
|
||||
<< " targetCoreId=" << channelInfo.targetCoreId << " channelId=" << channelInfo.channelId;
|
||||
return failure();
|
||||
@@ -505,8 +454,8 @@ private:
|
||||
}
|
||||
|
||||
SmallVector<Value> taskYieldValues;
|
||||
rewriter.setInsertionPointToEnd(program.block);
|
||||
if (isa<SpatCompute>(task.sourceOp)) {
|
||||
rewriter.setInsertionPointToEnd(&program.op.getBody().front());
|
||||
if (isa<SpatCompute>(task.computeInstance.op)) {
|
||||
IRMapping mapper;
|
||||
for (auto [argIndex, oldArg] : llvm::enumerate(templateBlock.getArguments()))
|
||||
mapper.map(oldArg, resolvedInputs[argIndex]);
|
||||
@@ -547,7 +496,8 @@ private:
|
||||
Operation* clonedOp = rewriter.clone(op, mapper);
|
||||
if (auto oldWeightedMvmOp = dyn_cast<spatial::SpatMVMOp>(&op)) {
|
||||
if (oldWeightedMvmOp.getWeightIndex() != 0) {
|
||||
task.sourceOp->emitOpError("batched per-cpu merge materialization expects lane-local weight index 0");
|
||||
task.computeInstance.op->emitOpError(
|
||||
"batched per-cpu merge materialization expects lane-local weight index 0");
|
||||
return failure();
|
||||
}
|
||||
auto newWeightedMvmOp = cast<spatial::SpatMVMOp>(clonedOp);
|
||||
@@ -555,7 +505,8 @@ private:
|
||||
}
|
||||
if (auto oldWeightedVmmOp = dyn_cast<spatial::SpatVMMOp>(&op)) {
|
||||
if (oldWeightedVmmOp.getWeightIndex() != 0) {
|
||||
task.sourceOp->emitOpError("batched per-cpu merge materialization expects lane-local weight index 0");
|
||||
task.computeInstance.op->emitOpError(
|
||||
"batched per-cpu merge materialization expects lane-local weight index 0");
|
||||
return failure();
|
||||
}
|
||||
auto newWeightedVmmOp = cast<spatial::SpatVMMOp>(clonedOp);
|
||||
@@ -589,7 +540,7 @@ private:
|
||||
auto producedIt = producedValuesByTask.find(outputRef.instance);
|
||||
if (producedIt == producedValuesByTask.end() || producedIt->second.size() <= outputRef.resultIndex) {
|
||||
ScheduledTask task = taskByComputeInstance.at(outputRef.instance);
|
||||
task.sourceOp->emitOpError("missing yielded external value during per-cpu merge materialization")
|
||||
task.computeInstance.op->emitOpError("missing yielded external value during per-cpu merge materialization")
|
||||
<< " cpu=" << cpu << " laneStart=" << outputRef.instance.laneStart;
|
||||
return failure();
|
||||
}
|
||||
@@ -610,13 +561,9 @@ private:
|
||||
}
|
||||
|
||||
LogicalResult eraseOldScheduledOps() {
|
||||
DenseSet<Operation*> allOpsToErase = oldComputeOps;
|
||||
for (Operation* op : internalInputOpsToErase)
|
||||
allOpsToErase.insert(op);
|
||||
|
||||
SmallVector<Operation*> orderedOpsToErase;
|
||||
for (Operation& op : func.getBody().front())
|
||||
if (allOpsToErase.contains(&op))
|
||||
if (oldComputeOps.contains(&op))
|
||||
orderedOpsToErase.push_back(&op);
|
||||
|
||||
for (Operation* op : llvm::reverse(orderedOpsToErase)) {
|
||||
@@ -626,10 +573,10 @@ private:
|
||||
remainingUsers.push_back(user);
|
||||
if (!remainingUsers.empty()) {
|
||||
InFlightDiagnostic diagnostic = op->emitOpError("still has uses during per-cpu merge cleanup")
|
||||
<< "; erase-set=" << (allOpsToErase.contains(op) ? "yes" : "no");
|
||||
<< "; erase-set=" << (oldComputeOps.contains(op) ? "yes" : "no");
|
||||
for (Operation* user : remainingUsers) {
|
||||
diagnostic.attachNote(user->getLoc())
|
||||
<< "remaining user " << user->getName() << "; erase-set=" << (allOpsToErase.contains(user) ? "yes" : "no");
|
||||
<< "remaining user " << user->getName() << "; erase-set=" << (oldComputeOps.contains(user) ? "yes" : "no");
|
||||
}
|
||||
return failure();
|
||||
}
|
||||
@@ -663,8 +610,6 @@ private:
|
||||
DenseMap<size_t, SmallVector<ScheduledTask>> tasksByCpu;
|
||||
SmallVector<size_t> orderedCpus;
|
||||
DenseSet<size_t> seenCpus;
|
||||
DenseSet<Operation*> internalInputOpsToErase;
|
||||
DenseMap<Operation*, bool> isInternalInputOpCache;
|
||||
DenseSet<Operation*> externalUsersToMove;
|
||||
DenseMap<ComputeInstance, SmallVector<SmallVector<RemoteSendInfo>>> remoteSendsByTask;
|
||||
DenseMap<ComputeInstance, SmallVector<std::optional<ChannelInfo>>> remoteInputsByTask;
|
||||
|
||||
Reference in New Issue
Block a user