This commit is contained in:
@@ -1,402 +0,0 @@
|
||||
#include <iostream>
|
||||
#include <vector>
|
||||
#include <algorithm>
|
||||
#include <queue>
|
||||
#include <random>
|
||||
#include <chrono>
|
||||
#include <omp.h>
|
||||
#include <numeric>
|
||||
#include <fstream>
|
||||
|
||||
using namespace std;
|
||||
|
||||
// ==========================================
|
||||
// 1. DATA STRUCTURES
|
||||
// ==========================================
|
||||
|
||||
// High-performance DAG representation using adjacency lists
|
||||
struct DAG {
|
||||
int num_nodes;
|
||||
int num_processors;
|
||||
|
||||
// Computation cost matrix (Flattened: num_nodes x num_processors)
|
||||
vector<float> comp_costs;
|
||||
|
||||
// Edges: node -> vector of pair<target_node, communication_cost>
|
||||
vector<vector<pair<int, float>>> successors;
|
||||
vector<vector<pair<int, float>>> predecessors;
|
||||
|
||||
DAG(int n, int p) : num_nodes(n), num_processors(p),
|
||||
comp_costs(n * p, 0.0f),
|
||||
successors(n), predecessors(n) {}
|
||||
|
||||
inline float get_comp_cost(int task, int proc) const {
|
||||
return comp_costs[task * num_processors + proc];
|
||||
}
|
||||
};
|
||||
|
||||
// Struct to hold the final schedule for each task
|
||||
struct TaskSchedule {
|
||||
int processor;
|
||||
float start_time;
|
||||
float end_time;
|
||||
};
|
||||
|
||||
// ==========================================
|
||||
// 2. DAG GENERATOR (REAL-WORLD SKEW)
|
||||
// ==========================================
|
||||
DAG generate_dag(int num_nodes, int num_processors, int levels, float ccr) {
|
||||
DAG dag(num_nodes, num_processors);
|
||||
mt19937 gen(42); // Fixed seed for reproducibility
|
||||
uniform_real_distribution<float> comp_dist(10.0f, 100.0f);
|
||||
|
||||
for (int i = 0; i < num_nodes * num_processors; ++i) {
|
||||
dag.comp_costs[i] = comp_dist(gen);
|
||||
}
|
||||
|
||||
vector<vector<int>> nodes_per_level(levels);
|
||||
uniform_int_distribution<int> lvl_dist(0, levels - 1);
|
||||
|
||||
for (int i = 0; i < num_nodes; ++i) {
|
||||
if (i == 0) nodes_per_level[0].push_back(i);
|
||||
else if (i == num_nodes - 1) nodes_per_level[levels - 1].push_back(i);
|
||||
else nodes_per_level[lvl_dist(gen)].push_back(i);
|
||||
}
|
||||
|
||||
float avg_comp = 55.0f;
|
||||
uniform_real_distribution<float> comm_dist(avg_comp * ccr * 0.5f, avg_comp * ccr * 1.5f);
|
||||
uniform_real_distribution<float> prob(0.0, 1.0);
|
||||
|
||||
long long total_edges = 0;
|
||||
|
||||
for (int l = 0; l < levels - 1; ++l) {
|
||||
if (nodes_per_level[l].empty()) continue;
|
||||
|
||||
for (int u : nodes_per_level[l]) {
|
||||
int target_level = l + 1;
|
||||
while (target_level < levels && nodes_per_level[target_level].empty()) target_level++;
|
||||
|
||||
if (target_level < levels) {
|
||||
// 1. BASE DEPENDENCY: Ensure the graph flows forward (no disconnected nodes)
|
||||
int v = nodes_per_level[target_level][gen() % nodes_per_level[target_level].size()];
|
||||
float comm = comm_dist(gen);
|
||||
dag.successors[u].push_back({v, comm});
|
||||
dag.predecessors[v].push_back({u, comm});
|
||||
total_edges++;
|
||||
|
||||
// ----------------------------------------------------
|
||||
// 2. THE REAL-WORLD SKEW LOGIC (Hubs vs Normal Nodes)
|
||||
// ----------------------------------------------------
|
||||
|
||||
// Make 0.5% of nodes act as "Super Hubs" (Broadcast nodes)
|
||||
bool is_super_hub = (prob(gen) < 0.005);
|
||||
int extra_edges = 0;
|
||||
|
||||
if (is_super_hub) {
|
||||
// This node is a Hub! Give it 20,000 children!
|
||||
// (Or as many as the remaining graph size permits)
|
||||
extra_edges = 2000;
|
||||
} else {
|
||||
// Normal node: 20% chance to just have 1 to 3 extra children
|
||||
if (prob(gen) < 0.2) {
|
||||
uniform_int_distribution<int> normal_dist(1, 3);
|
||||
extra_edges = normal_dist(gen);
|
||||
}
|
||||
}
|
||||
|
||||
// Randomly connect these extra edges to ANY node in ANY future level
|
||||
for (int e = 0; e < extra_edges; ++e) {
|
||||
uniform_int_distribution<int> future_lvl_dist(target_level, levels - 1);
|
||||
int f_lvl = future_lvl_dist(gen);
|
||||
if (nodes_per_level[f_lvl].empty()) continue;
|
||||
|
||||
int child_v = nodes_per_level[f_lvl][gen() % nodes_per_level[f_lvl].size()];
|
||||
|
||||
float extra_comm = comm_dist(gen);
|
||||
dag.successors[u].push_back({child_v, extra_comm});
|
||||
dag.predecessors[child_v].push_back({u, extra_comm});
|
||||
total_edges++;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
cout << " [Generator] Created " << total_edges << " total edges (simulating Hubs).\n";
|
||||
return dag;
|
||||
}
|
||||
|
||||
|
||||
// ==========================================
|
||||
// 3. PEFT SCHEDULER (O(E*P) Optimized)
|
||||
// ==========================================
|
||||
void run_peft(const DAG& dag, vector<TaskSchedule>& final_schedule) {
|
||||
int N = dag.num_nodes;
|
||||
int P = dag.num_processors;
|
||||
|
||||
// 1. Level sorting for parallel Bottom-Up OCT computation
|
||||
vector<int> out_degree(N, 0);
|
||||
for(int i=0; i<N; ++i) out_degree[i] = dag.successors[i].size();
|
||||
|
||||
vector<vector<int>> reverse_levels;
|
||||
queue<int> q;
|
||||
for(int i=0; i<N; ++i) if(out_degree[i] == 0) q.push(i);
|
||||
|
||||
while(!q.empty()) {
|
||||
int size = q.size();
|
||||
vector<int> current_level;
|
||||
for(int i=0; i<size; ++i) {
|
||||
int u = q.front(); q.pop();
|
||||
current_level.push_back(u);
|
||||
for(auto& edge : dag.predecessors[u]) {
|
||||
int p_node = edge.first;
|
||||
if(--out_degree[p_node] == 0) q.push(p_node);
|
||||
}
|
||||
}
|
||||
reverse_levels.push_back(current_level);
|
||||
}
|
||||
|
||||
|
||||
// 2. Compute OCT (Optimistic Cost Table)
|
||||
vector<float> oct(N * P, 0.0f);
|
||||
|
||||
// NEW: Cache array to drop complexity to O(E * P)
|
||||
vector<float> min_oct_comp(N, 0.0f);
|
||||
|
||||
for (const auto& level_nodes : reverse_levels) {
|
||||
#pragma omp parallel for schedule(dynamic)
|
||||
for (int idx = 0; idx < level_nodes.size(); ++idx) {
|
||||
int task = level_nodes[idx];
|
||||
|
||||
vector<float> max_vals(P, 0.0f);
|
||||
|
||||
// Loop over successors first (O(E * P) total instead of O(E * P^2))
|
||||
for (auto& edge : dag.successors[task]) {
|
||||
int succ = edge.first;
|
||||
float comm_cost = edge.second;
|
||||
|
||||
// The precalculated global minimum for this successor
|
||||
float val_diff = min_oct_comp[succ] + comm_cost;
|
||||
|
||||
// Cache-friendly, auto-vectorizable O(P) loop
|
||||
for (int p_j = 0; p_j < P; ++p_j) {
|
||||
float val_same = oct[succ * P + p_j] + dag.get_comp_cost(succ, p_j);
|
||||
float min_w = min(val_same, val_diff);
|
||||
max_vals[p_j] = max(max_vals[p_j], min_w);
|
||||
}
|
||||
}
|
||||
|
||||
// Assign to OCT and precalculate the minimum for the predecessors
|
||||
float task_min_val = 1e9f;
|
||||
for (int p_j = 0; p_j < P; ++p_j) {
|
||||
oct[task * P + p_j] = max_vals[p_j];
|
||||
task_min_val = min(task_min_val, max_vals[p_j] + dag.get_comp_cost(task, p_j));
|
||||
}
|
||||
min_oct_comp[task] = task_min_val;
|
||||
}
|
||||
}
|
||||
|
||||
// 3. Compute Rank_OCT and sort tasks (Phase 1)
|
||||
vector<pair<float, int>> rank_oct(N);
|
||||
#pragma omp parallel for
|
||||
for (int i = 0; i < N; ++i) {
|
||||
float avg_oct = 0;
|
||||
for (int p = 0; p < P; ++p) avg_oct += oct[i * P + p];
|
||||
rank_oct[i] = {avg_oct / P, i};
|
||||
}
|
||||
|
||||
sort(rank_oct.rbegin(), rank_oct.rend());
|
||||
|
||||
// 4. Processor Assignment (Phase 2)
|
||||
final_schedule.resize(N);
|
||||
vector<float> avail(P, 0.0f);
|
||||
|
||||
for (int i = 0; i < N; ++i) {
|
||||
int task = rank_oct[i].second;
|
||||
|
||||
int best_p = -1;
|
||||
float min_o_eft = 1e9f;
|
||||
float best_est = 0.0f;
|
||||
float best_eft = 0.0f;
|
||||
|
||||
for (int p = 0; p < P; ++p) {
|
||||
float data_ready_time = 0.0f;
|
||||
for (auto& pred_edge : dag.predecessors[task]) {
|
||||
int pred = pred_edge.first;
|
||||
float comm = pred_edge.second;
|
||||
int pred_p = final_schedule[pred].processor;
|
||||
float comm_penalty = (pred_p == p) ? 0.0f : comm;
|
||||
data_ready_time = max(data_ready_time, final_schedule[pred].end_time + comm_penalty);
|
||||
}
|
||||
|
||||
float est = max(avail[p], data_ready_time);
|
||||
float eft = est + dag.get_comp_cost(task, p);
|
||||
float o_eft = eft + oct[task * P + p];
|
||||
|
||||
if (o_eft < min_o_eft) {
|
||||
min_o_eft = o_eft;
|
||||
best_p = p;
|
||||
best_est = est;
|
||||
best_eft = eft;
|
||||
}
|
||||
}
|
||||
|
||||
final_schedule[task] = {best_p, best_est, best_eft};
|
||||
avail[best_p] = best_eft;
|
||||
}
|
||||
}
|
||||
|
||||
// ==========================================
|
||||
// 4. VISUALIZATION EXPORTERS (DOT)
|
||||
// ==========================================
|
||||
|
||||
void export_dag_to_dot(const DAG& dag, const string& filename) {
|
||||
ofstream out(filename);
|
||||
out << "digraph RawDAG {\n";
|
||||
out << " rankdir=TB;\n";
|
||||
out << " node [shape=record, style=filled, fillcolor=lightgrey, fontname=\"Helvetica\"];\n";
|
||||
out << " edge [fontname=\"Helvetica\", fontsize=10];\n\n";
|
||||
|
||||
for (int i = 0; i < dag.num_nodes; ++i) {
|
||||
out << " Task_" << i << " [label=\"Task " << i << "\"];\n";
|
||||
}
|
||||
|
||||
out << "\n";
|
||||
for (int i = 0; i < dag.num_nodes; ++i) {
|
||||
for (const auto& edge : dag.successors[i]) {
|
||||
out << " Task_" << i << " -> Task_" << edge.first
|
||||
<< " [label=\"" << (int)edge.second << "\"];\n";
|
||||
}
|
||||
}
|
||||
|
||||
out << "}\n";
|
||||
out.close();
|
||||
}
|
||||
|
||||
void export_schedule_to_dot(const DAG& dag, const vector<TaskSchedule>& schedule, const string& filename) {
|
||||
ofstream out(filename);
|
||||
out << "digraph ScheduledDAG {\n";
|
||||
out << " rankdir=TB;\n";
|
||||
// FIX 1: Change shape to standard 'box' to prevent the flat edge warning
|
||||
out << " node [shape=box, fontname=\"Helvetica\", style=filled, fillcolor=white, rounded=true];\n";
|
||||
out << " edge [fontname=\"Helvetica\", fontsize=10];\n\n";
|
||||
|
||||
vector<vector<int>> proc_tasks(dag.num_processors);
|
||||
for (int i = 0; i < dag.num_nodes; ++i) {
|
||||
proc_tasks[schedule[i].processor].push_back(i);
|
||||
}
|
||||
|
||||
for (int p = 0; p < dag.num_processors; ++p) {
|
||||
if (proc_tasks[p].empty()) continue;
|
||||
|
||||
sort(proc_tasks[p].begin(), proc_tasks[p].end(), [&schedule](int a, int b) {
|
||||
return schedule[a].start_time < schedule[b].start_time;
|
||||
});
|
||||
|
||||
out << " subgraph cluster_P" << p << " {\n";
|
||||
out << " label=\"Processor " << p << "\";\n";
|
||||
out << " fontname=\"Helvetica-Bold\";\n";
|
||||
out << " style=rounded;\n";
|
||||
out << " bgcolor=\"#f0f8ff\";\n";
|
||||
out << " color=blue;\n\n";
|
||||
|
||||
for (int task : proc_tasks[p]) {
|
||||
// FIX 1 cont: Use standard \n linebreaks instead of the record | syntax
|
||||
out << " Task_" << task
|
||||
<< " [label=\"Task " << task
|
||||
<< "\\nStart: " << schedule[task].start_time
|
||||
<< "\\nEnd: " << schedule[task].end_time << "\"];\n";
|
||||
}
|
||||
|
||||
for (size_t i = 0; i < proc_tasks[p].size() - 1; ++i) {
|
||||
out << " Task_" << proc_tasks[p][i] << " -> Task_" << proc_tasks[p][i+1]
|
||||
<< " [style=invis, weight=10];\n";
|
||||
}
|
||||
|
||||
out << " }\n\n";
|
||||
}
|
||||
|
||||
for (int i = 0; i < dag.num_nodes; ++i) {
|
||||
for (const auto& edge : dag.successors[i]) {
|
||||
int target = edge.first;
|
||||
bool same_proc = (schedule[i].processor == schedule[target].processor);
|
||||
|
||||
// FIX 2: If on the same processor, add 'constraint=false'.
|
||||
// The invisible edges already handle the layout, so don't let this edge confuse the solver.
|
||||
out << " Task_" << i << " -> Task_" << target
|
||||
<< " [label=\"" << (int)edge.second << "\""
|
||||
<< (same_proc ? ", constraint=false]" : ", color=red, fontcolor=red, style=dashed]")
|
||||
<< ";\n";
|
||||
}
|
||||
}
|
||||
|
||||
out << "}\n";
|
||||
out.close();
|
||||
}
|
||||
|
||||
|
||||
// ==========================================
|
||||
// 5. MAIN
|
||||
// ==========================================
|
||||
int main() {
|
||||
// Testing with a small graph to ensure DOT generation runs
|
||||
int N = 30000;
|
||||
int P = 1000;
|
||||
|
||||
cout << "Generating DAG with " << N << " nodes and " << P << " processors..." << endl;
|
||||
auto start_gen = chrono::high_resolution_clock::now();
|
||||
DAG dag = generate_dag(N, P, 300, 1.0f); // 10 levels for a small graph
|
||||
auto end_gen = chrono::high_resolution_clock::now();
|
||||
cout << "DAG Generation took: " << chrono::duration<double>(end_gen - start_gen).count() << " s\n";
|
||||
|
||||
vector<TaskSchedule> schedule;
|
||||
|
||||
cout << "Running PEFT Scheduling..." << endl;
|
||||
auto start_sched = chrono::high_resolution_clock::now();
|
||||
run_peft(dag, schedule);
|
||||
auto end_sched = chrono::high_resolution_clock::now();
|
||||
|
||||
cout << "Scheduling took: " << chrono::duration<double>(end_sched - start_sched).count() << " s\n\n";
|
||||
|
||||
// ==========================================
|
||||
// METRICS REPORT
|
||||
// ==========================================
|
||||
float makespan = 0.0f;
|
||||
for (int i = 0; i < N; ++i) makespan = max(makespan, schedule[i].end_time);
|
||||
|
||||
// Calculate Sequential Makespan (If we ran all tasks on the single fastest processor)
|
||||
float best_seq_makespan = 1e9f;
|
||||
int best_seq_processor = -1;
|
||||
for (int p = 0; p < P; ++p) {
|
||||
float current_seq = 0.0f;
|
||||
for (int i = 0; i < N; ++i) current_seq += dag.get_comp_cost(i, p);
|
||||
if (current_seq < best_seq_makespan) {
|
||||
best_seq_makespan = current_seq;
|
||||
best_seq_processor = p;
|
||||
}
|
||||
}
|
||||
|
||||
float time_gained = best_seq_makespan - makespan;
|
||||
float speedup = best_seq_makespan / makespan;
|
||||
|
||||
cout << "--- METRICS REPORT ---\n";
|
||||
cout << "Sequential Time (CPU " << best_seq_processor << "): " << best_seq_makespan << " units\n";
|
||||
cout << "Parallel PEFT Makespan: " << makespan << " units\n";
|
||||
cout << "Total Time Gained: " << time_gained << " units\n";
|
||||
cout << "Overall Speedup: " << speedup << "x\n";
|
||||
cout << "----------------------\n";
|
||||
|
||||
// Generate visualization only for small graphs
|
||||
if (N <= 50) {
|
||||
cout << "\nGraph size is small (N <= 50). Generating Graphviz DOT files...\n";
|
||||
export_dag_to_dot(dag, "dag_raw.dot");
|
||||
export_schedule_to_dot(dag, schedule, "dag_scheduled.dot");
|
||||
|
||||
cout << "Saved 'dag_raw.dot' and 'dag_scheduled.dot'.\n";
|
||||
cout << "To render images, run the following commands in your terminal:\n";
|
||||
cout << " dot -Tpng dag_raw.dot -o dag_raw.png\n";
|
||||
cout << " dot -Tpng dag_scheduled.dot -o dag_scheduled.png\n";
|
||||
}
|
||||
|
||||
|
||||
return 0;
|
||||
}
|
||||
Reference in New Issue
Block a user