#include #include #include #include #include #include #include #include #include using namespace std; // ========================================== // 1. DATA STRUCTURES // ========================================== // High-performance DAG representation using adjacency lists struct DAG { int num_nodes; int num_processors; // Computation cost matrix (Flattened: num_nodes x num_processors) vector comp_costs; // Edges: node -> vector of pair vector>> successors; vector>> predecessors; DAG(int n, int p) : num_nodes(n), num_processors(p), comp_costs(n * p, 0.0f), successors(n), predecessors(n) {} inline float get_comp_cost(int task, int proc) const { return comp_costs[task * num_processors + proc]; } }; // Struct to hold the final schedule for each task struct TaskSchedule { int processor; float start_time; float end_time; }; // ========================================== // 2. DAG GENERATOR (REAL-WORLD SKEW) // ========================================== DAG generate_dag(int num_nodes, int num_processors, int levels, float ccr) { DAG dag(num_nodes, num_processors); mt19937 gen(42); // Fixed seed for reproducibility uniform_real_distribution comp_dist(10.0f, 100.0f); for (int i = 0; i < num_nodes * num_processors; ++i) { dag.comp_costs[i] = comp_dist(gen); } vector> nodes_per_level(levels); uniform_int_distribution lvl_dist(0, levels - 1); for (int i = 0; i < num_nodes; ++i) { if (i == 0) nodes_per_level[0].push_back(i); else if (i == num_nodes - 1) nodes_per_level[levels - 1].push_back(i); else nodes_per_level[lvl_dist(gen)].push_back(i); } float avg_comp = 55.0f; uniform_real_distribution comm_dist(avg_comp * ccr * 0.5f, avg_comp * ccr * 1.5f); uniform_real_distribution prob(0.0, 1.0); long long total_edges = 0; for (int l = 0; l < levels - 1; ++l) { if (nodes_per_level[l].empty()) continue; for (int u : nodes_per_level[l]) { int target_level = l + 1; while (target_level < levels && nodes_per_level[target_level].empty()) target_level++; if (target_level < levels) { // 1. BASE DEPENDENCY: Ensure the graph flows forward (no disconnected nodes) int v = nodes_per_level[target_level][gen() % nodes_per_level[target_level].size()]; float comm = comm_dist(gen); dag.successors[u].push_back({v, comm}); dag.predecessors[v].push_back({u, comm}); total_edges++; // ---------------------------------------------------- // 2. THE REAL-WORLD SKEW LOGIC (Hubs vs Normal Nodes) // ---------------------------------------------------- // Make 0.5% of nodes act as "Super Hubs" (Broadcast nodes) bool is_super_hub = (prob(gen) < 0.005); int extra_edges = 0; if (is_super_hub) { // This node is a Hub! Give it 20,000 children! // (Or as many as the remaining graph size permits) extra_edges = 2000; } else { // Normal node: 20% chance to just have 1 to 3 extra children if (prob(gen) < 0.2) { uniform_int_distribution normal_dist(1, 3); extra_edges = normal_dist(gen); } } // Randomly connect these extra edges to ANY node in ANY future level for (int e = 0; e < extra_edges; ++e) { uniform_int_distribution future_lvl_dist(target_level, levels - 1); int f_lvl = future_lvl_dist(gen); if (nodes_per_level[f_lvl].empty()) continue; int child_v = nodes_per_level[f_lvl][gen() % nodes_per_level[f_lvl].size()]; float extra_comm = comm_dist(gen); dag.successors[u].push_back({child_v, extra_comm}); dag.predecessors[child_v].push_back({u, extra_comm}); total_edges++; } } } } cout << " [Generator] Created " << total_edges << " total edges (simulating Hubs).\n"; return dag; } // ========================================== // 3. PEFT SCHEDULER (O(E*P) Optimized) // ========================================== void run_peft(const DAG& dag, vector& final_schedule) { int N = dag.num_nodes; int P = dag.num_processors; // 1. Level sorting for parallel Bottom-Up OCT computation vector out_degree(N, 0); for(int i=0; i> reverse_levels; queue q; for(int i=0; i current_level; for(int i=0; i oct(N * P, 0.0f); // NEW: Cache array to drop complexity to O(E * P) vector min_oct_comp(N, 0.0f); for (const auto& level_nodes : reverse_levels) { #pragma omp parallel for schedule(dynamic) for (int idx = 0; idx < level_nodes.size(); ++idx) { int task = level_nodes[idx]; vector max_vals(P, 0.0f); // Loop over successors first (O(E * P) total instead of O(E * P^2)) for (auto& edge : dag.successors[task]) { int succ = edge.first; float comm_cost = edge.second; // The precalculated global minimum for this successor float val_diff = min_oct_comp[succ] + comm_cost; // Cache-friendly, auto-vectorizable O(P) loop for (int p_j = 0; p_j < P; ++p_j) { float val_same = oct[succ * P + p_j] + dag.get_comp_cost(succ, p_j); float min_w = min(val_same, val_diff); max_vals[p_j] = max(max_vals[p_j], min_w); } } // Assign to OCT and precalculate the minimum for the predecessors float task_min_val = 1e9f; for (int p_j = 0; p_j < P; ++p_j) { oct[task * P + p_j] = max_vals[p_j]; task_min_val = min(task_min_val, max_vals[p_j] + dag.get_comp_cost(task, p_j)); } min_oct_comp[task] = task_min_val; } } // 3. Compute Rank_OCT and sort tasks (Phase 1) vector> rank_oct(N); #pragma omp parallel for for (int i = 0; i < N; ++i) { float avg_oct = 0; for (int p = 0; p < P; ++p) avg_oct += oct[i * P + p]; rank_oct[i] = {avg_oct / P, i}; } sort(rank_oct.rbegin(), rank_oct.rend()); // 4. Processor Assignment (Phase 2) final_schedule.resize(N); vector avail(P, 0.0f); for (int i = 0; i < N; ++i) { int task = rank_oct[i].second; int best_p = -1; float min_o_eft = 1e9f; float best_est = 0.0f; float best_eft = 0.0f; for (int p = 0; p < P; ++p) { float data_ready_time = 0.0f; for (auto& pred_edge : dag.predecessors[task]) { int pred = pred_edge.first; float comm = pred_edge.second; int pred_p = final_schedule[pred].processor; float comm_penalty = (pred_p == p) ? 0.0f : comm; data_ready_time = max(data_ready_time, final_schedule[pred].end_time + comm_penalty); } float est = max(avail[p], data_ready_time); float eft = est + dag.get_comp_cost(task, p); float o_eft = eft + oct[task * P + p]; if (o_eft < min_o_eft) { min_o_eft = o_eft; best_p = p; best_est = est; best_eft = eft; } } final_schedule[task] = {best_p, best_est, best_eft}; avail[best_p] = best_eft; } } // ========================================== // 4. VISUALIZATION EXPORTERS (DOT) // ========================================== void export_dag_to_dot(const DAG& dag, const string& filename) { ofstream out(filename); out << "digraph RawDAG {\n"; out << " rankdir=TB;\n"; out << " node [shape=record, style=filled, fillcolor=lightgrey, fontname=\"Helvetica\"];\n"; out << " edge [fontname=\"Helvetica\", fontsize=10];\n\n"; for (int i = 0; i < dag.num_nodes; ++i) { out << " Task_" << i << " [label=\"Task " << i << "\"];\n"; } out << "\n"; for (int i = 0; i < dag.num_nodes; ++i) { for (const auto& edge : dag.successors[i]) { out << " Task_" << i << " -> Task_" << edge.first << " [label=\"" << (int)edge.second << "\"];\n"; } } out << "}\n"; out.close(); } void export_schedule_to_dot(const DAG& dag, const vector& schedule, const string& filename) { ofstream out(filename); out << "digraph ScheduledDAG {\n"; out << " rankdir=TB;\n"; // FIX 1: Change shape to standard 'box' to prevent the flat edge warning out << " node [shape=box, fontname=\"Helvetica\", style=filled, fillcolor=white, rounded=true];\n"; out << " edge [fontname=\"Helvetica\", fontsize=10];\n\n"; vector> proc_tasks(dag.num_processors); for (int i = 0; i < dag.num_nodes; ++i) { proc_tasks[schedule[i].processor].push_back(i); } for (int p = 0; p < dag.num_processors; ++p) { if (proc_tasks[p].empty()) continue; sort(proc_tasks[p].begin(), proc_tasks[p].end(), [&schedule](int a, int b) { return schedule[a].start_time < schedule[b].start_time; }); out << " subgraph cluster_P" << p << " {\n"; out << " label=\"Processor " << p << "\";\n"; out << " fontname=\"Helvetica-Bold\";\n"; out << " style=rounded;\n"; out << " bgcolor=\"#f0f8ff\";\n"; out << " color=blue;\n\n"; for (int task : proc_tasks[p]) { // FIX 1 cont: Use standard \n linebreaks instead of the record | syntax out << " Task_" << task << " [label=\"Task " << task << "\\nStart: " << schedule[task].start_time << "\\nEnd: " << schedule[task].end_time << "\"];\n"; } for (size_t i = 0; i < proc_tasks[p].size() - 1; ++i) { out << " Task_" << proc_tasks[p][i] << " -> Task_" << proc_tasks[p][i+1] << " [style=invis, weight=10];\n"; } out << " }\n\n"; } for (int i = 0; i < dag.num_nodes; ++i) { for (const auto& edge : dag.successors[i]) { int target = edge.first; bool same_proc = (schedule[i].processor == schedule[target].processor); // FIX 2: If on the same processor, add 'constraint=false'. // The invisible edges already handle the layout, so don't let this edge confuse the solver. out << " Task_" << i << " -> Task_" << target << " [label=\"" << (int)edge.second << "\"" << (same_proc ? ", constraint=false]" : ", color=red, fontcolor=red, style=dashed]") << ";\n"; } } out << "}\n"; out.close(); } // ========================================== // 5. MAIN // ========================================== int main() { // Testing with a small graph to ensure DOT generation runs int N = 30000; int P = 1000; cout << "Generating DAG with " << N << " nodes and " << P << " processors..." << endl; auto start_gen = chrono::high_resolution_clock::now(); DAG dag = generate_dag(N, P, 300, 1.0f); // 10 levels for a small graph auto end_gen = chrono::high_resolution_clock::now(); cout << "DAG Generation took: " << chrono::duration(end_gen - start_gen).count() << " s\n"; vector schedule; cout << "Running PEFT Scheduling..." << endl; auto start_sched = chrono::high_resolution_clock::now(); run_peft(dag, schedule); auto end_sched = chrono::high_resolution_clock::now(); cout << "Scheduling took: " << chrono::duration(end_sched - start_sched).count() << " s\n\n"; // ========================================== // METRICS REPORT // ========================================== float makespan = 0.0f; for (int i = 0; i < N; ++i) makespan = max(makespan, schedule[i].end_time); // Calculate Sequential Makespan (If we ran all tasks on the single fastest processor) float best_seq_makespan = 1e9f; int best_seq_processor = -1; for (int p = 0; p < P; ++p) { float current_seq = 0.0f; for (int i = 0; i < N; ++i) current_seq += dag.get_comp_cost(i, p); if (current_seq < best_seq_makespan) { best_seq_makespan = current_seq; best_seq_processor = p; } } float time_gained = best_seq_makespan - makespan; float speedup = best_seq_makespan / makespan; cout << "--- METRICS REPORT ---\n"; cout << "Sequential Time (CPU " << best_seq_processor << "): " << best_seq_makespan << " units\n"; cout << "Parallel PEFT Makespan: " << makespan << " units\n"; cout << "Total Time Gained: " << time_gained << " units\n"; cout << "Overall Speedup: " << speedup << "x\n"; cout << "----------------------\n"; // Generate visualization only for small graphs if (N <= 50) { cout << "\nGraph size is small (N <= 50). Generating Graphviz DOT files...\n"; export_dag_to_dot(dag, "dag_raw.dot"); export_schedule_to_dot(dag, schedule, "dag_scheduled.dot"); cout << "Saved 'dag_raw.dot' and 'dag_scheduled.dot'.\n"; cout << "To render images, run the following commands in your terminal:\n"; cout << " dot -Tpng dag_raw.dot -o dag_raw.png\n"; cout << " dot -Tpng dag_scheduled.dot -o dag_scheduled.png\n"; } return 0; }