1 组成
-
线程(生产者)
- 发布任务
-
队列
- 任务上下文
- 任务执行函数
-
线程池(消费者)
- 取出任务
- 执行任务
- 线程调度方式
2 作用
- 复用线程资源
- 节省线程创建的销毁的开销
- 可以异步处理生产者发布的任务
- 提高处理多个任务的效率
3 封装
3.1 变量对象
- 任务队列
- 工作线程数组(向量)
- 互斥锁
- 条件变量
- 状态标志
- 线程数量
- 原子变量(任务数量)
3.2 任务结构体
struct TaskFunc {
TaskFunc(expireTime) : _expireTime(expireTime) {}
std::function<void()> _func;
uint64_t _expireTime = 0;
}
typedef std::shared_pte<TaskFunc> TaskFuncPtr;
3.3 构造函数
- 初始化线程数量
- 初始化状态标志
ThreadPool::ThreadPool() :thread_num_(1), terminate_(false) {}
3.4 析构函数
- 执行
Stop
函数
ThreadPool::~ThreadPool() {
Stop();
}
3.5 线程池初始化
- 支持手动设置线程数量
bool ThreadPool::Init(int64_t num) {
std::unique_lock<std::mutex> lock(mutex_);
if(!threads_.empty())
return false;
thread_num_ = num;
return true;
}
3.6 线程池停止
- 修改状态标志
- 唤醒所有线程,让他们在后台执行完后退出,释放
- 清空线程数组
void ThreadPool::Stop() {
{
std::unique_lock<std::mutex> lock(mutex_);
terminate_ = true;
condition_.notify_all();
}
for(int i = 0; i < threads_.size(); i++) {
if(threads_[i]->joinable())
threads_[i]->join();
delete threads_[i];
threads_[i] = nullptr;
}
std::unique_lock<std::mutex> lock(mutex_);
threads_.clear();
}
3.7 获取线程池线程数量
int ThreadPool::GetThreadNum() {
return threads_.size();
}
3.8 获取任务数量
int ThreadPool::GetTaskNum() {
return tasks_.size();
}
3.9 线程池开启
- 创建
thread_num
个线程 - 用
Run
函数作为线程的执行函数
bool ThreadPool::Start() {
std::unique_lock<std::mutex> lock(mutex_);
if(!threads_.empty())
return false;
for(size_t i = 0; i < thread_num_; i++) {
threads_.push_back(new std::thread(&ThreadPool::Run, this));
}
return true;
}
3.10 传入任务
- 将
(限定时间)
,任务函数
,任务呢函数添加进任务队列
auto Exec(F &&f, Args... args) -> std::future<decltype(f(args...))> {
Exec(0, f, args...);
}
3.11 等待函数
- 如果任务队列为空,返回
- 如果传入时间小于0,等到所有任务执行完
- 如果传入时间大于0,规定时间执行任务,返回队列是否为空
bool ThreadPool::WaitForAllDone(int timewait) {
std::unique_lock<std::mutex> lock(mutex_);
if(tasks_.empty())
return true;
if(timewait < 0) {
condition_.wait(lock, [this]{return tasks_.empty();});
return true;
}
else {
return condition_.wait_for(lock, std::chrono::milliseconds(timewait),
[this]{return tasks_.empty();});
}
}
3.12 获取任务
- 如果任务队列为空,释放互斥锁等待;直到有任务或者认为 停止线程池
- 如果任务队列不为空,获取并且释放一个任务
bool ThreadPool::Get(TaskFuncPtr &task) {
std::unique_lock<std::mutex> lock(mutex_);
if(tasks_.empty()) {
condition_.wait(lock, [this] {
return terminate_ || !tasks_.empty();
});
}
if(terminate_)
return false;
if(!tasks_.empty()) {
task = std::move(tasks_.front());
tasks_.pop();
return true;
}
return false;
}
3.13 执行任务
- 如果成功获取任务,执行任务+1,执行任务
- 执行任务结束后,执行任务-1
- 通知等待函数
void ThreadPool::Run() {
while (!terminate_) {
TaskFuncPtr task;
bool ok = Get(task);
if(ok) {
atomic_++;
try {
if(task->_expireTime > 0 && task->_expireTime > GetNowMs()) {
//任务超时处理
}
else
task->_func;
}
catch (...) {}
atomic_--;
std::unique_lock<std::mutex> lock(mutex_);
if(atomic_ == 0 && tasks_.empty())
condition_.notify_all();
}
}
}