在多核CPU环境下,实现一个管道过滤器程序,其中任务函数对象间存在依赖关系,可以通过结合拓扑排序和线程池技术来实现高效的任务调度。以下是一个示例代码,演示如何使用C++实现这样一个系统:
1. 任务和依赖关系
我们将任务视作一个有向无环图(DAG)的节点,每个节点有可能依赖其他节点。拓扑排序将帮助我们确定任务的执行顺序,以保证所有依赖关系都被满足。
2. 实现步骤
- 定义任务:每个任务函数对象定义了一个任务的执行逻辑。
- 构建依赖图:用图结构来表示任务之间的依赖关系。
- 拓扑排序:计算任务的执行顺序,保证所有依赖被满足。
- 线程池执行:使用线程池并行执行任务。
示例代码
以下示例代码展示了如何结合这些技术来实现任务的调度:
#include <iostream>
#include <vector>
#include <queue>
#include <unordered_map>
#include <unordered_set>
#include <thread>
#include <functional>
#include <future>
#include <mutex>
// 任务类,存储任务函数和依赖
class Task {
public:
std::function<void()> func; // 任务执行函数
std::vector<int> dependencies; // 依赖任务的ID
Task(std::function<void()> f, std::vector<int> deps)
: func(f), dependencies(deps) {}
};
// 拓扑排序
std::vector<int> topologicalSort(const std::unordered_map<int, Task>& tasks,
const std::unordered_map<int, std::vector<int>>& graph) {
std::unordered_map<int, int> inDegree; // 记录每个节点的入度
std::queue<int> zeroInDegree; // 入度为0的节点
std::vector<int> sortedOrder;
// 初始化入度
for (const auto& [taskID, task] : tasks) {
inDegree[taskID] = 0;
}
for (const auto& [taskID, dependencies] : graph) {
for (int dep : dependencies) {
inDegree[dep]++;
}
}
// 将入度为0的节点加入队列
for (const auto& [taskID, degree] : inDegree) {
if (degree == 0) {
zeroInDegree.push(taskID);
}
}
// 拓扑排序
while (!zeroInDegree.empty()) {
int current = zeroInDegree.front();
zeroInDegree.pop();
sortedOrder.push_back(current);
for (int neighbor : graph.at(current)) {
if (--inDegree[neighbor] == 0) {
zeroInDegree.push(neighbor);
}
}
}
return sortedOrder;
}
// 执行任务的函数
void executeTasks(const std::vector<int>& taskOrder,
const std::unordered_map<int, Task>& tasks) {
std::unordered_map<int, std::future<void>> futures;
std::mutex mtx;
for (int taskID : taskOrder) {
const Task& task = tasks.at(taskID);
// 使用 std::async 执行任务
futures[taskID] = std::async(std::launch::async, [task, &mtx]() {
// 确保任务函数是线程安全的
{
std::lock_guard<std::mutex> lock(mtx);
std::cout << "Executing task " << taskID << std::endl;
}
task.func();
});
}
// 等待所有任务完成
for (auto& [taskID, future] : futures) {
future.get();
}
}
int main() {
// 定义任务
std::unordered_map<int, Task> tasks = {
{1, Task([]() { std::this_thread::sleep_for(std::chrono::seconds(1)); std::cout << "Task 1\n"; }, {})},
{2, Task([]() { std::cout << "Task 2\n"; }, {1})},
{3, Task([]() { std::cout << "Task 3\n"; }, {1})},
{4, Task([]() { std::cout << "Task 4\n"; }, {2, 3})}
};
// 构建依赖图
std::unordered_map<int, std::vector<int>> graph;
for (const auto& [taskID, task] : tasks) {
for (int dep : task.dependencies) {
graph[dep].push_back(taskID);
}
}
// 执行拓扑排序
std::vector<int> sortedOrder = topologicalSort(tasks, graph);
// 执行任务
executeTasks(sortedOrder, tasks);
return 0;
}
在前面的示例中,各个任务并不是在同一个线程中执行的,而是使用了 std::async
将任务分派到可能的多个线程中进行异步执行。这样可以充分利用多核CPU的性能,实现并行执行。
如何实现并行执行:
-
std::async
的使用:std::async(std::launch::async, ...)
的调用会启动新的线程来执行任务。这使得每个任务可以并行运行,从而在多核CPU上加速任务的执行。
-
任务线程的调度:
- 在
executeTasks
函数中,每个任务会被交给std::async
进行处理。这样,每个任务的执行是独立的,不会阻塞其他任务的执行。
- 在
-
同步与等待:
- 在任务执行的最后,使用
future.get()
来等待所有异步任务完成。这保证了主线程在所有任务执行完毕后再继续后续操作。
- 在任务执行的最后,使用
举个例子
假设我们有四个任务:Task 1、Task 2、Task 3 和 Task 4。如果只是在同一个线程中执行这些任务,那么它们将一个接一个地依次执行。而使用 std::async
,这些任务可能会被调度到不同的线程中,具体取决于可用的CPU核心数量和操作系统的线程调度策略。
示例运行时情况
-
串行执行:
- 任务1 -> 任务2 -> 任务3 -> 任务4
- 总时间 = 任务1 + 任务2 + 任务3 + 任务4
-
并行执行:
- 任务1(线程1) | 任务2(线程2) | 任务3(线程3) | 任务4(线程4)
- 总时间 = max(任务1, 任务2, 任务3, 任务4) — 这一般会远小于串行执行的总时间。
结论
因此,可以确认,在示例中各个任务并不是在同一个线程中执行的,而是尽可能地并行化执行,以提高性能和效率。这是多核处理器的一个重要优势。